Skip to content

Commit

Permalink
Merge branch 'branch-2.0' into featurn_backup_cooldown_20_squash
Browse files Browse the repository at this point in the history
  • Loading branch information
justfortaste authored Dec 24, 2024
2 parents 2738ced + fdff4a6 commit 5e40545
Show file tree
Hide file tree
Showing 76 changed files with 1,330 additions and 283 deletions.
1 change: 1 addition & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ static void init_doris_metrics(const std::vector<StorePath>& store_paths) {
void signal_handler(int signal) {
if (signal == SIGINT || signal == SIGTERM) {
k_doris_exit = true;
k_doris_start = false;
LOG(INFO) << "doris start to exit";
}
}
Expand Down
1 change: 1 addition & 0 deletions be/src/common/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace doris {

struct StorePath;
inline bool k_doris_exit = false;
inline bool k_doris_start = false;

class Daemon {
public:
Expand Down
1 change: 1 addition & 0 deletions be/src/exprs/bloom_filter_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ class BloomFilterFuncBase : public FilterFuncBase {
}

_bloom_filter_alloced = data_size;
_inited = true;
return _bloom_filter->init(data, data_size);
}

Expand Down
22 changes: 18 additions & 4 deletions be/src/http/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/config.h"
#include "http/http_headers.h"
#include "http/http_status.h"
#include "util/security.h"
#include "util/stack_util.h"

namespace doris {
Expand Down Expand Up @@ -198,9 +199,11 @@ Status HttpClient::execute(const std::function<bool(const void* data, size_t len
_callback = &callback;
auto code = curl_easy_perform(_curl);
if (code != CURLE_OK) {
std::string url = mask_token(_get_url());
LOG(WARNING) << "fail to execute HTTP client, errmsg=" << _to_errmsg(code)
<< ", trace=" << get_stack_trace();
return Status::HttpError(_to_errmsg(code));
<< ", trace=" << get_stack_trace() << ", url=" << url;
std::string errmsg = fmt::format("{}, url={}", _to_errmsg(code), url);
return Status::HttpError(std::move(errmsg));
}
return Status::OK();
}
Expand Down Expand Up @@ -268,13 +271,22 @@ Status HttpClient::execute(std::string* response) {
return execute(callback);
}

const char* HttpClient::_to_errmsg(CURLcode code) {
const char* HttpClient::_to_errmsg(CURLcode code) const {
if (_error_buf[0] == 0) {
return curl_easy_strerror(code);
}
return _error_buf;
}

const char* HttpClient::_get_url() const {
const char* url = nullptr;
curl_easy_getinfo(_curl, CURLINFO_EFFECTIVE_URL, &url);
if (!url) {
url = "<unknown>";
}
return url;
}

Status HttpClient::execute_with_retry(int retry_times, int sleep_time,
const std::function<Status(HttpClient*)>& callback) {
Status status;
Expand All @@ -286,7 +298,9 @@ Status HttpClient::execute_with_retry(int retry_times, int sleep_time,
if (http_status == 200) {
return status;
} else {
auto error_msg = fmt::format("http status code is not 200, code={}", http_status);
std::string url = mask_token(client._get_url());
auto error_msg = fmt::format("http status code is not 200, code={}, url={}",
http_status, url);
LOG(WARNING) << error_msg;
return Status::HttpError(error_msg);
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/http/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ class HttpClient {
size_t on_response_data(const void* data, size_t length);

private:
const char* _to_errmsg(CURLcode code);
const char* _to_errmsg(CURLcode code) const;
const char* _get_url() const;

private:
CURL* _curl = nullptr;
Expand Down
59 changes: 58 additions & 1 deletion be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,9 @@ Status Compaction::do_compaction_impl(int64_t permits) {
auto& fs = _output_rowset->rowset_meta()->fs();
auto& tablet_path = _tablet->tablet_path();

// After doing index compaction, need to add this size to rowset->total_size
int64_t compacted_index_file_size = 0;

// we choose the first destination segment name as the temporary index writer path
// Used to distinguish between different index compaction
auto index_writer_path = tablet_path + "/" + dest_index_files[0];
Expand All @@ -536,7 +539,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
ctx.skip_inverted_index.cbegin(), ctx.skip_inverted_index.cend(),
[&src_segment_num, &dest_segment_num, &index_writer_path, &src_index_files,
&dest_index_files, &fs, &tablet_path, &trans_vec, &dest_segment_num_rows,
&status, this](int32_t column_uniq_id) {
&status, &compacted_index_file_size, this](int32_t column_uniq_id) {
auto error_handler = [this](int64_t index_id, int64_t column_uniq_id) {
LOG(WARNING) << "failed to do index compaction"
<< ". tablet=" << _tablet->tablet_id()
Expand Down Expand Up @@ -584,6 +587,25 @@ Status Compaction::do_compaction_impl(int64_t permits) {
error_handler(index_id, column_uniq_id);
status = Status::Error<ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>(
st.msg());
} else {
for (int i = 0; i < dest_segment_num; ++i) {
// format: rowsetId_segmentId_columnId
auto seg_path =
std::static_pointer_cast<BetaRowset>(_output_rowset)
->segment_file_path(i);
std::string index_path =
InvertedIndexDescriptor::get_index_file_name(seg_path,
index_id);
int64_t current_size = 0;
st = fs->file_size(index_path, &current_size);
if (!st.ok()) {
error_handler(index_id, column_uniq_id);
status = Status::Error<
ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>(
st.msg());
}
compacted_index_file_size += current_size;
}
}
} catch (CLuceneError& e) {
error_handler(index_id, column_uniq_id);
Expand All @@ -597,6 +619,41 @@ Status Compaction::do_compaction_impl(int64_t permits) {
return status;
}

// index compaction should update total disk size and index disk size=
_output_rowset->rowset_meta()->set_data_disk_size(
_output_rowset->rowset_meta()->data_disk_size() + compacted_index_file_size);
_output_rowset->rowset_meta()->set_total_disk_size(
_output_rowset->rowset_meta()->total_disk_size() + compacted_index_file_size);
_output_rowset->rowset_meta()->set_index_disk_size(_output_rowset->index_disk_size() +
compacted_index_file_size);

DBUG_EXECUTE_IF("check_after_compaction_file_size", {
int64_t total_file_size = 0;
for (int i = 0; i < dest_segment_num; ++i) {
auto seg_path = std::static_pointer_cast<BetaRowset>(_output_rowset)
->segment_file_path(i);
int64_t current_size = 0;
RETURN_IF_ERROR(fs->file_size(seg_path, &current_size));
total_file_size += current_size;
for (auto& column : _cur_tablet_schema->columns()) {
const TabletIndex* index_meta =
_cur_tablet_schema->get_inverted_index(column.unique_id());
if (index_meta) {
std::string index_path = InvertedIndexDescriptor::get_index_file_name(
seg_path, index_meta->index_id());
RETURN_IF_ERROR(fs->file_size(index_path, &current_size));
total_file_size += current_size;
}
}
}
if (total_file_size != _output_rowset->rowset_meta()->data_disk_size()) {
Status::Error<ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>(
"total file size {} is not equal rowset meta size {}", total_file_size,
_output_rowset->rowset_meta()->data_disk_size());
}
LOG(INFO) << "succeed to check index compaction file size";
})

LOG(INFO) << "succeed to do index compaction"
<< ". tablet=" << _tablet->full_name()
<< ", input row number=" << _input_row_num
Expand Down
15 changes: 13 additions & 2 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,19 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key,
if (tablet_index) {
opts.need_bloom_filter = true;
opts.is_ngram_bf_index = true;
opts.gram_size = tablet_index->get_gram_size();
opts.gram_bf_size = tablet_index->get_gram_bf_size();
//narrow convert from int32_t to uint8_t and uint16_t which is dangerous
auto gram_size = tablet_index->get_gram_size();
auto gram_bf_size = tablet_index->get_gram_bf_size();
if (gram_size > 256 || gram_size < 1) {
return Status::NotSupported("Do not support ngram bloom filter for ngram_size: ",
gram_size);
}
if (gram_bf_size > 65535 || gram_bf_size < 64) {
return Status::NotSupported("Do not support ngram bloom filter for bf_size: ",
gram_bf_size);
}
opts.gram_size = gram_size;
opts.gram_bf_size = gram_bf_size;
}

opts.need_bitmap_index = column.has_bitmap_index();
Expand Down
12 changes: 7 additions & 5 deletions be/src/olap/single_replica_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "task/engine_clone_task.h"
#include "util/brpc_client_cache.h"
#include "util/doris_metrics.h"
#include "util/security.h"
#include "util/thrift_rpc_helper.h"
#include "util/trace.h"

Expand Down Expand Up @@ -390,7 +391,7 @@ Status SingleReplicaCompaction::_download_files(DataDir* data_dir,
// then it will try to clone from BE 2, but it will find the file 1 already exist, but file 1 with same
// name may have different versions.
VLOG_DEBUG << "single replica compaction begin to download files, remote path="
<< remote_url_prefix << " local_path=" << local_path;
<< mask_token(remote_url_prefix) << " local_path=" << local_path;
RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(local_path));
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(local_path));

Expand Down Expand Up @@ -448,9 +449,9 @@ Status SingleReplicaCompaction::_download_files(DataDir* data_dir,

std::string local_file_path = local_path + file_name;

LOG(INFO) << "single replica compaction begin to download file from: " << remote_file_url
<< " to: " << local_file_path << ". size(B): " << file_size
<< ", timeout(s): " << estimate_timeout;
LOG(INFO) << "single replica compaction begin to download file from: "
<< mask_token(remote_file_url) << " to: " << local_file_path
<< ". size(B): " << file_size << ", timeout(s): " << estimate_timeout;

auto download_cb = [&remote_file_url, estimate_timeout, &local_file_path,
file_size](HttpClient* client) {
Expand All @@ -462,7 +463,8 @@ Status SingleReplicaCompaction::_download_files(DataDir* data_dir,
uint64_t local_file_size = std::filesystem::file_size(local_file_path);
if (local_file_size != file_size) {
LOG(WARNING) << "download file length error"
<< ", remote_path=" << remote_file_url << ", file_size=" << file_size
<< ", remote_path=" << mask_token(remote_file_url)
<< ", file_size=" << file_size
<< ", local_file_size=" << local_file_size;
return Status::InternalError("downloaded file size is not equal");
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/single_replica_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ class SingleReplicaCompaction : public Compaction {
Status _download_files(DataDir* data_dir, const std::string& remote_url_prefix,
const std::string& local_path);
Status _release_snapshot(const std::string& ip, int port, const std::string& snapshot_path);
Status _finish_clone(const string& clone_dir, const Version& version);
Status _finish_clone(const std::string& clone_dir, const Version& version);
CompactionType _compaction_type;

DISALLOW_COPY_AND_ASSIGN(SingleReplicaCompaction);
};

} // namespace doris
} // namespace doris
17 changes: 6 additions & 11 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include <memory>
#include <mutex>
#include <ostream>
#include <regex>
#include <set>
#include <shared_mutex>
#include <system_error>
Expand Down Expand Up @@ -63,6 +62,7 @@
#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "util/network_util.h"
#include "util/security.h"
#include "util/stopwatch.hpp"
#include "util/thrift_rpc_helper.h"
#include "util/trace.h"
Expand Down Expand Up @@ -410,7 +410,7 @@ Status EngineCloneTask::_make_and_download_snapshots(DataDir& data_dir,
_clone_req.table_id, _clone_req.partition_id, _clone_req.schema_hash, false, 0);
} else {
LOG_WARNING("failed to download snapshot from remote BE")
.tag("url", _mask_token(remote_url_prefix))
.tag("url", mask_token(remote_url_prefix))
.error(status);
}

Expand Down Expand Up @@ -554,11 +554,11 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& re

std::string local_file_path = local_path + "/" + file_name;

LOG(INFO) << "clone begin to download file from: " << _mask_token(remote_file_url)
LOG(INFO) << "clone begin to download file from: " << mask_token(remote_file_url)
<< " to: " << local_file_path << ". size(B): " << file_size
<< ", timeout(s): " << estimate_timeout;

auto download_cb = [this, &remote_file_url, estimate_timeout, &local_file_path,
auto download_cb = [&remote_file_url, estimate_timeout, &local_file_path,
file_size](HttpClient* client) {
RETURN_IF_ERROR(client->init(remote_file_url));
client->set_timeout_ms(estimate_timeout * 1000);
Expand All @@ -574,7 +574,7 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& re
}
if (local_file_size != file_size) {
LOG(WARNING) << "download file length error"
<< ", remote_path=" << _mask_token(remote_file_url)
<< ", remote_path=" << mask_token(remote_file_url)
<< ", file_size=" << file_size
<< ", local_file_size=" << local_file_size;
return Status::InternalError("downloaded file size is not equal");
Expand Down Expand Up @@ -602,7 +602,7 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& re

/// This method will only be called if tablet already exist in this BE when doing clone.
/// This method will do the following things:
/// 1. Linke all files from CLONE dir to tablet dir if file does not exist in tablet dir
/// 1. Link all files from CLONE dir to tablet dir if file does not exist in tablet dir
/// 2. Call _finish_xx_clone() to revise the tablet meta.
Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_dir,
int64_t committed_version, bool is_incremental_clone) {
Expand Down Expand Up @@ -867,9 +867,4 @@ Status EngineCloneTask::_finish_full_clone(Tablet* tablet,
// TODO(plat1ko): write cooldown meta to remote if this replica is cooldown replica
}

std::string EngineCloneTask::_mask_token(const std::string& str) {
std::regex pattern("token=[\\w|-]+");
return regex_replace(str, pattern, "token=******");
}

} // namespace doris
2 changes: 0 additions & 2 deletions be/src/olap/task/engine_clone_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ class EngineCloneTask : public EngineTask {

Status _release_snapshot(const std::string& ip, int port, const std::string& snapshot_path);

std::string _mask_token(const std::string& str);

private:
const TCloneReq& _clone_req;
vector<TTabletInfo>* _tablet_infos;
Expand Down
9 changes: 5 additions & 4 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class MemTracker;
class RuntimeState;

extern bool k_doris_exit;
extern bool k_doris_start;
extern bthread_key_t btls_key;

// Using gcc11 compiles thread_local variable on lower versions of GLIBC will report an error,
Expand Down Expand Up @@ -388,17 +389,17 @@ class AddThreadMemTrackerConsumer {
// which is different from the previous behavior.
#define CONSUME_MEM_TRACKER(size) \
do { \
if (doris::thread_context_ptr.init) { \
if (doris::k_doris_start && doris::thread_context_ptr.init) { \
doris::thread_context()->consume_memory(size); \
} else if (doris::ExecEnv::GetInstance()->initialized()) { \
} else if (doris::k_doris_start && doris::ExecEnv::GetInstance()->initialized()) { \
doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(size); \
} \
} while (0)
#define RELEASE_MEM_TRACKER(size) \
do { \
if (doris::thread_context_ptr.init) { \
if (doris::k_doris_start && doris::thread_context_ptr.init) { \
doris::thread_context()->consume_memory(-size); \
} else if (doris::ExecEnv::GetInstance()->initialized()) { \
} else if (doris::k_doris_start && doris::ExecEnv::GetInstance()->initialized()) { \
doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak( \
-size); \
} \
Expand Down
2 changes: 2 additions & 0 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ int __llvm_profile_write_file();

namespace doris {
extern bool k_doris_exit;
extern bool k_doris_start;

static void thrift_output(const char* x) {
LOG(WARNING) << "thrift internal message: " << x;
Expand Down Expand Up @@ -463,6 +464,7 @@ int main(int argc, char** argv) {

// init exec env
auto exec_env = doris::ExecEnv::GetInstance();
doris::k_doris_start = true;
doris::ExecEnv::init(exec_env, paths);
doris::TabletSchemaCache::create_global_schema_cache();

Expand Down
Loading

0 comments on commit 5e40545

Please sign in to comment.