Skip to content

Commit

Permalink
Merge branch 'branch-2.0' into branch-2.0-pt
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs authored Jan 2, 2024
2 parents f798647 + 65daf02 commit e0b0030
Show file tree
Hide file tree
Showing 156 changed files with 13,314 additions and 815 deletions.
1 change: 0 additions & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ github:
- Build Broker
- Build Documents
- BE UT (Clang)
- ShellCheck
- clickbench (benchmark)
- BE UT (macOS)
- Build Third Party Libraries (Linux)
Expand Down
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@
[submodule "be/src/clucene"]
path = be/src/clucene
url = https://github.com/apache/doris-thirdparty.git
branch = clucene
branch = clucene-2.0
10 changes: 10 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,16 @@ void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
tablet_meta_info.time_series_compaction_time_threshold_seconds);
need_to_save = true;
}
if (tablet_meta_info.__isset.time_series_compaction_empty_rowsets_threshold) {
if (tablet->tablet_meta()->compaction_policy() != "time_series") {
status = Status::InvalidArgument(
"only time series compaction policy support time series config");
continue;
}
tablet->tablet_meta()->set_time_series_compaction_empty_rowsets_threshold(
tablet_meta_info.time_series_compaction_empty_rowsets_threshold);
need_to_save = true;
}
if (tablet_meta_info.__isset.replica_id) {
tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id);
}
Expand Down
7 changes: 6 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -996,14 +996,19 @@ DEFINE_String(inverted_index_query_cache_limit, "10%");

// inverted index
DEFINE_mDouble(inverted_index_ram_buffer_size, "512");
// -1 indicates not working.
// Normally we should not change this, it's useful for testing.
DEFINE_mInt32(inverted_index_max_buffered_docs, "-1");
DEFINE_Int32(query_bkd_inverted_index_limit_percent, "5"); // 5%
// dict path for chinese analyzer
DEFINE_String(inverted_index_dict_path, "${DORIS_HOME}/dict");
DEFINE_Int32(inverted_index_read_buffer_size, "4096");
// tree depth for bkd index
DEFINE_Int32(max_depth_in_bkd_tree, "32");
// index compaction
DEFINE_Bool(inverted_index_compaction_enable, "false");
DEFINE_mBool(inverted_index_compaction_enable, "false");
// index by RAM directory
DEFINE_mBool(inverted_index_ram_dir_enable, "false");
// use num_broadcast_buffer blocks as buffer to do broadcast
DEFINE_Int32(num_broadcast_buffer, "32");
// semi-structure configs
Expand Down
5 changes: 4 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1037,10 +1037,13 @@ DECLARE_Int32(query_bkd_inverted_index_limit_percent); // 5%
// dict path for chinese analyzer
DECLARE_String(inverted_index_dict_path);
DECLARE_Int32(inverted_index_read_buffer_size);
DECLARE_mInt32(inverted_index_max_buffered_docs);
// tree depth for bkd index
DECLARE_Int32(max_depth_in_bkd_tree);
// index compaction
DECLARE_Bool(inverted_index_compaction_enable);
DECLARE_mBool(inverted_index_compaction_enable);
// index by RAM directory
DECLARE_mBool(inverted_index_ram_dir_enable);
// use num_broadcast_buffer blocks as buffer to do broadcast
DECLARE_Int32(num_broadcast_buffer);
// semi-structure configs
Expand Down
27 changes: 17 additions & 10 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,13 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
if (obj.HasMember("fields")) {
pure_doc_value = true;
}
const rapidjson::Value& line = obj.HasMember(FIELD_SOURCE) ? obj[FIELD_SOURCE] : obj["fields"];
// obj may be neither have `_source` nor `fields` field.
const rapidjson::Value* line = nullptr;
if (obj.HasMember(FIELD_SOURCE)) {
line = &obj[FIELD_SOURCE];
} else if (obj.HasMember("fields")) {
line = &obj["fields"];
}

for (int i = 0; i < tuple_desc->slots().size(); ++i) {
const SlotDescriptor* slot_desc = tuple_desc->slots()[i];
Expand All @@ -496,17 +502,18 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
const char* col_name = pure_doc_value ? docvalue_context.at(slot_desc->col_name()).c_str()
: slot_desc->col_name().c_str();

rapidjson::Value::ConstMemberIterator itr = line.FindMember(col_name);
if (itr == line.MemberEnd() && slot_desc->is_nullable()) {
auto nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(col_ptr);
nullable_column->insert_data(nullptr, 0);
continue;
} else if (itr == line.MemberEnd() && !slot_desc->is_nullable()) {
std::string details = strings::Substitute(INVALID_NULL_VALUE, col_name);
return Status::RuntimeError(details);
if (line == nullptr || line->FindMember(col_name) == line->MemberEnd()) {
if (slot_desc->is_nullable()) {
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(col_ptr);
nullable_column->insert_data(nullptr, 0);
continue;
} else {
std::string details = strings::Substitute(INVALID_NULL_VALUE, col_name);
return Status::RuntimeError(details);
}
}

const rapidjson::Value& col = line[col_name];
const rapidjson::Value& col = (*line)[col_name];

PrimitiveType type = slot_desc->type().type;

Expand Down
14 changes: 14 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,20 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
}
}

if (UNLIKELY((http_req->header(HttpHeaders::CONTENT_LENGTH).empty() &&
!ctx->is_chunked_transfer))) {
LOG(WARNING) << "content_length is empty and transfer-encoding!=chunked, please set "
"content_length or transfer-encoding=chunked";
return Status::InvalidArgument(
"content_length is empty and transfer-encoding!=chunked, please set content_length "
"or transfer-encoding=chunked");
} else if (UNLIKELY(!http_req->header(HttpHeaders::CONTENT_LENGTH).empty() &&
ctx->is_chunked_transfer)) {
LOG(WARNING) << "please do not set both content_length and transfer-encoding";
return Status::InvalidArgument(
"please do not set both content_length and transfer-encoding");
}

if (!http_req->header(HTTP_TIMEOUT).empty()) {
try {
ctx->timeout_second = std::stoi(http_req->header(HTTP_TIMEOUT));
Expand Down
8 changes: 4 additions & 4 deletions be/src/index-tools/index_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#include "olap/rowset/segment_v2/inverted_index_compound_reader.h"

using doris::segment_v2::DorisCompoundReader;
using doris::segment_v2::DorisCompoundDirectory;
using doris::segment_v2::DorisCompoundDirectoryFactory;
using doris::io::FileInfo;
using namespace lucene::analysis;
using namespace lucene::index;
Expand Down Expand Up @@ -150,7 +150,7 @@ int main(int argc, char** argv) {
auto fs = doris::io::global_local_filesystem();
try {
lucene::store::Directory* dir =
DorisCompoundDirectory::getDirectory(fs, dir_str.c_str());
DorisCompoundDirectoryFactory::getDirectory(fs, dir_str.c_str());
auto reader = new DorisCompoundReader(dir, file_str.c_str(), 4096);
std::vector<std::string> files;
std::cout << "Nested files for " << file_str << std::endl;
Expand All @@ -173,7 +173,7 @@ int main(int argc, char** argv) {
auto fs = doris::io::global_local_filesystem();
try {
lucene::store::Directory* dir =
DorisCompoundDirectory::getDirectory(fs, dir_str.c_str());
DorisCompoundDirectoryFactory::getDirectory(fs, dir_str.c_str());
auto reader = new DorisCompoundReader(dir, file_str.c_str(), 4096);
std::cout << "Term statistics for " << file_str << std::endl;
std::cout << "==================================" << std::endl;
Expand All @@ -190,7 +190,7 @@ int main(int argc, char** argv) {
auto fs = doris::io::global_local_filesystem();
try {
lucene::store::Directory* dir =
DorisCompoundDirectory::getDirectory(fs, FLAGS_directory.c_str());
DorisCompoundDirectoryFactory::getDirectory(fs, FLAGS_directory.c_str());
if (FLAGS_idx_file_name == "") {
//try to search from directory's all files
std::vector<FileInfo> files;
Expand Down
41 changes: 35 additions & 6 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,34 @@ Status Compaction::do_compaction_impl(int64_t permits) {

if (_input_row_num > 0 && stats.rowid_conversion && config::inverted_index_compaction_enable) {
OlapStopWatch inverted_watch;

// check rowid_conversion correctness
Version version = _tablet->max_version();
DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id());
std::set<RowLocation> missed_rows;
std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map;
// Convert the delete bitmap of the input rowsets to output rowset.
std::size_t missed_rows_size = 0;
_tablet->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows,
&location_map, _tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);
if (!allow_delete_in_cumu_compaction()) {
missed_rows_size = missed_rows.size();
if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION &&
stats.merged_rows != missed_rows_size) {
std::string err_msg = fmt::format(
"cumulative compaction: the merged rows({}) is not equal to missed "
"rows({}) in rowid conversion, tablet_id: {}, table_id:{}",
stats.merged_rows, missed_rows_size, _tablet->tablet_id(),
_tablet->table_id());
DCHECK(false) << err_msg;
LOG(WARNING) << err_msg;
}
}

RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map));

// translation vec
// <<dest_idx_num, dest_docId>>
// the first level vector: index indicates src segment.
Expand All @@ -428,7 +456,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
// src index files
// format: rowsetId_segmentId
std::vector<std::string> src_index_files(src_segment_num);
for (auto m : src_seg_to_id_map) {
for (const auto& m : src_seg_to_id_map) {
std::pair<RowsetId, uint32_t> p = m.first;
src_index_files[m.second] = p.first.to_string() + "_" + std::to_string(p.second);
}
Expand Down Expand Up @@ -605,7 +633,8 @@ Status Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool
std::string dir_str = p.parent_path().string();
std::string file_str = p.filename().string();
lucene::store::Directory* dir =
DorisCompoundDirectory::getDirectory(fs, dir_str.c_str());
DorisCompoundDirectoryFactory::getDirectory(
fs, dir_str.c_str());
DorisCompoundReader reader(dir, file_str.c_str());
std::vector<std::string> files;
reader.list(&files);
Expand Down Expand Up @@ -677,11 +706,11 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
// of incremental data later.
// TODO(LiaoXin): check if there are duplicate keys
std::size_t missed_rows_size = 0;
_tablet->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows,
&location_map, _tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);
if (!allow_delete_in_cumu_compaction()) {
_tablet->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows,
&location_map, _tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);
missed_rows_size = missed_rows.size();
if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && stats != nullptr &&
stats->merged_rows != missed_rows_size) {
Expand Down
36 changes: 33 additions & 3 deletions be/src/olap/cumulative_compaction_time_series_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
return 0;
}

// If there is a continuous set of empty rowsets, prioritize merging.
auto consecutive_empty_rowsets = tablet->pick_first_consecutive_empty_rowsets(
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
if (!consecutive_empty_rowsets.empty()) {
return score;
}

// Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size
int64_t compaction_goal_size_mbytes =
tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
Expand Down Expand Up @@ -149,6 +156,13 @@ void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
break;
}

// check if the rowset has been compacted, but it is a empty rowset
if (!is_delete && rs->version().first != 0 && rs->version().first != rs->version().second &&
rs->num_segments() == 0) {
*ret_cumulative_point = rs->version().first;
break;
}

// include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
prev_version = rs->version().second;
*ret_cumulative_point = prev_version + 1;
Expand All @@ -166,6 +180,19 @@ int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
return 0;
}

// If their are many empty rowsets, maybe should be compacted
auto consecutive_empty_rowsets = tablet->pick_first_consecutive_empty_rowsets(
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
if (!consecutive_empty_rowsets.empty()) {
VLOG_NOTICE << "tablet is " << tablet->tablet_id()
<< ", there are too many consecutive empty rowsets, size is "
<< consecutive_empty_rowsets.size();
input_rowsets->clear();
input_rowsets->insert(input_rowsets->end(), consecutive_empty_rowsets.begin(),
consecutive_empty_rowsets.end());
return 0;
}

int transient_size = 0;
*compaction_score = 0;
input_rowsets->clear();
Expand All @@ -175,8 +202,9 @@ int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
// BE1 should performs compaction on its own, the time series compaction may re-compact previously fetched rowsets.
// time series compaction policy needs to skip over the fetched rowset
const auto& first_rowset_iter = std::find_if(
candidate_rowsets.begin(), candidate_rowsets.end(),
[](const RowsetSharedPtr& rs) { return rs->start_version() == rs->end_version(); });
candidate_rowsets.begin(), candidate_rowsets.end(), [](const RowsetSharedPtr& rs) {
return rs->start_version() == rs->end_version() || rs->num_segments() == 0;
});
for (auto it = first_rowset_iter; it != candidate_rowsets.end(); ++it) {
const auto& rowset = *it;
// check whether this rowset is delete version
Expand Down Expand Up @@ -254,10 +282,12 @@ int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
void TimeSeriesCumulativeCompactionPolicy::update_cumulative_point(
Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset, Version& last_delete_version) {
if (tablet->tablet_state() != TABLET_RUNNING) {
if (tablet->tablet_state() != TABLET_RUNNING || output_rowset->num_segments() == 0) {
// if tablet under alter process, do not update cumulative point
// if the merged output rowset is empty, do not update cumulative point
return;
}

tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
}

Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ InvertedIndexSearcherCache* InvertedIndexSearcherCache::_s_instance = nullptr;
IndexSearcherPtr InvertedIndexSearcherCache::build_index_searcher(const io::FileSystemSPtr& fs,
const std::string& index_dir,
const std::string& file_name) {
DorisCompoundReader* directory =
new DorisCompoundReader(DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()),
file_name.c_str(), config::inverted_index_read_buffer_size);
DorisCompoundReader* directory = new DorisCompoundReader(
DorisCompoundDirectoryFactory::getDirectory(fs, index_dir.c_str()), file_name.c_str(),
config::inverted_index_read_buffer_size);
auto closeDirectory = true;
auto index_searcher =
std::make_shared<lucene::search::IndexSearcher>(directory, closeDirectory);
Expand Down Expand Up @@ -190,7 +190,7 @@ int64_t InvertedIndexSearcherCache::mem_consumption() {

bool InvertedIndexSearcherCache::_lookup(const InvertedIndexSearcherCache::CacheKey& key,
InvertedIndexCacheHandle* handle) {
auto lru_handle = _cache->lookup(key.index_file_path);
auto* lru_handle = _cache->lookup(key.index_file_path);
if (lru_handle == nullptr) {
return false;
}
Expand Down
Loading

0 comments on commit e0b0030

Please sign in to comment.