Skip to content

Commit

Permalink
branch-3.0: [fix](index size) discard index size when meta size is in…
Browse files Browse the repository at this point in the history
…valid #46549 (#46717)

cherry pick from #46549
  • Loading branch information
airborne12 authored Jan 13, 2025
1 parent bd32540 commit e174ddc
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 18 deletions.
22 changes: 19 additions & 3 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,25 @@ Status CompactionMixin::do_compact_ordered_rowsets() {

void CompactionMixin::build_basic_info() {
for (auto& rowset : _input_rowsets) {
_input_rowsets_data_size += rowset->data_disk_size();
_input_rowsets_index_size += rowset->index_disk_size();
_input_rowsets_total_size += rowset->total_disk_size();
const auto& rowset_meta = rowset->rowset_meta();
auto index_size = rowset_meta->index_disk_size();
auto total_size = rowset_meta->total_disk_size();
auto data_size = rowset_meta->data_disk_size();
// corrupted index size caused by bug before 2.1.5 or 3.0.0 version
// try to get real index size from disk.
if (index_size < 0 || index_size > total_size * 2) {
LOG(ERROR) << "invalid index size:" << index_size << " total size:" << total_size
<< " data size:" << data_size << " tablet:" << rowset_meta->tablet_id()
<< " rowset:" << rowset_meta->rowset_id();
index_size = 0;
auto st = rowset->get_inverted_index_size(&index_size);
if (!st.ok()) {
LOG(ERROR) << "failed to get inverted index size. res=" << st;
}
}
_input_rowsets_data_size += data_size;
_input_rowsets_index_size += index_size;
_input_rowsets_total_size += total_size;
_input_row_num += rowset->num_rows();
_input_num_segments += rowset->num_segments();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Status BetaRowset::do_load(bool /*use_cache*/) {
return Status::OK();
}

Status BetaRowset::get_inverted_index_size(size_t* index_size) {
Status BetaRowset::get_inverted_index_size(int64_t* index_size) {
const auto& fs = _rowset_meta->fs();
if (!fs) {
return Status::Error<INIT_FAILED>("get fs failed, resource_id={}",
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class BetaRowset final : public Rowset {

Status get_segments_size(std::vector<size_t>* segments_size);

Status get_inverted_index_size(size_t* index_size);
Status get_inverted_index_size(int64_t* index_size) override;

[[nodiscard]] virtual Status add_to_binlog() override;

Expand Down
23 changes: 21 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,27 @@ Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET);
RETURN_IF_ERROR(rowset->link_files_to(_context.tablet_path, _context.rowset_id));
_num_rows_written += rowset->num_rows();
_total_data_size += rowset->rowset_meta()->data_disk_size();
_total_index_size += rowset->rowset_meta()->index_disk_size();
const auto& rowset_meta = rowset->rowset_meta();
auto index_size = rowset_meta->index_disk_size();
auto total_size = rowset_meta->total_disk_size();
auto data_size = rowset_meta->data_disk_size();
// corrupted index size caused by bug before 2.1.5 or 3.0.0 version
// try to get real index size from disk.
if (index_size < 0 || index_size > total_size * 2) {
LOG(ERROR) << "invalid index size:" << index_size << " total size:" << total_size
<< " data size:" << data_size << " tablet:" << rowset_meta->tablet_id()
<< " rowset:" << rowset_meta->rowset_id();
index_size = 0;
auto st = rowset->get_inverted_index_size(&index_size);
if (!st.ok()) {
if (!st.is<NOT_FOUND>()) {
LOG(ERROR) << "failed to get inverted index size. res=" << st;
return st;
}
}
}
_total_data_size += data_size;
_total_index_size += index_size;
_num_segment += static_cast<int32_t>(rowset->num_segments());
// append key_bounds to current rowset
RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds));
Expand Down
8 changes: 5 additions & 3 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
// helper class to access RowsetMeta
int64_t start_version() const { return rowset_meta()->version().first; }
int64_t end_version() const { return rowset_meta()->version().second; }
size_t index_disk_size() const { return rowset_meta()->index_disk_size(); }
size_t data_disk_size() const { return rowset_meta()->data_disk_size(); }
size_t total_disk_size() const { return rowset_meta()->total_disk_size(); }
int64_t index_disk_size() const { return rowset_meta()->index_disk_size(); }
int64_t data_disk_size() const { return rowset_meta()->data_disk_size(); }
int64_t total_disk_size() const { return rowset_meta()->total_disk_size(); }
bool empty() const { return rowset_meta()->empty(); }
bool zero_num_rows() const { return rowset_meta()->num_rows() == 0; }
size_t num_rows() const { return rowset_meta()->num_rows(); }
Expand Down Expand Up @@ -210,6 +210,8 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
size_t new_rowset_start_seg_id = 0,
std::set<int64_t>* without_index_uids = nullptr) = 0;

virtual Status get_inverted_index_size(int64_t* index_size) = 0;

// copy all files to `dir`
virtual Status copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) = 0;

Expand Down
12 changes: 6 additions & 6 deletions be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,21 +130,21 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {

void set_num_rows(int64_t num_rows) { _rowset_meta_pb.set_num_rows(num_rows); }

size_t total_disk_size() const { return _rowset_meta_pb.total_disk_size(); }
int64_t total_disk_size() const { return _rowset_meta_pb.total_disk_size(); }

void set_total_disk_size(size_t total_disk_size) {
void set_total_disk_size(int64_t total_disk_size) {
_rowset_meta_pb.set_total_disk_size(total_disk_size);
}

size_t data_disk_size() const { return _rowset_meta_pb.data_disk_size(); }
int64_t data_disk_size() const { return _rowset_meta_pb.data_disk_size(); }

void set_data_disk_size(size_t data_disk_size) {
void set_data_disk_size(int64_t data_disk_size) {
_rowset_meta_pb.set_data_disk_size(data_disk_size);
}

size_t index_disk_size() const { return _rowset_meta_pb.index_disk_size(); }
int64_t index_disk_size() const { return _rowset_meta_pb.index_disk_size(); }

void set_index_disk_size(size_t index_disk_size) {
void set_index_disk_size(int64_t index_disk_size) {
_rowset_meta_pb.set_index_disk_size(index_disk_size);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/task/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Status IndexBuilder::update_inverted_index_info() {
TabletSchemaSPtr output_rs_tablet_schema = std::make_shared<TabletSchema>();
const auto& input_rs_tablet_schema = input_rowset->tablet_schema();
output_rs_tablet_schema->copy_from(*input_rs_tablet_schema);
size_t total_index_size = 0;
int64_t total_index_size = 0;
auto* beta_rowset = reinterpret_cast<BetaRowset*>(input_rowset.get());
auto size_st = beta_rowset->get_inverted_index_size(&total_index_size);
DBUG_EXECUTE_IF("IndexBuilder::update_inverted_index_info_size_st_not_ok", {
Expand Down
138 changes: 137 additions & 1 deletion be/test/olap/ordered_data_compaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ class OrderedDataCompactionTest : public ::testing::Test {
EXPECT_TRUE(io::global_local_filesystem()
->create_directory(absolute_dir + "/tablet_path")
.ok());
// tmp dir
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(tmp_dir).ok());
std::vector<StorePath> paths;
paths.emplace_back(std::string(tmp_dir), 1024000000);
auto tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(paths);
st = tmp_file_dirs->init();
EXPECT_TRUE(st.ok()) << st.to_json();
ExecEnv::GetInstance()->set_tmp_file_dir(std::move(tmp_file_dirs));

doris::EngineOptions options;
auto engine = std::make_unique<StorageEngine>(options);
Expand Down Expand Up @@ -153,6 +162,62 @@ class OrderedDataCompactionTest : public ::testing::Test {
return tablet_schema;
}

TabletSchemaSPtr create_inverted_index_v1_schema(KeysType keys_type = DUP_KEYS) {
TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
TabletSchemaPB tablet_schema_pb;
tablet_schema_pb.set_keys_type(keys_type);
tablet_schema_pb.set_num_short_key_columns(1);
tablet_schema_pb.set_num_rows_per_row_block(1024);
tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
tablet_schema_pb.set_next_column_unique_id(4);
tablet_schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1);

auto* index_pb = tablet_schema_pb.add_index();
index_pb->set_index_id(1);
index_pb->set_index_name("c1_index");
index_pb->set_index_type(IndexType::INVERTED);
index_pb->add_col_unique_id(2);

ColumnPB* column_1 = tablet_schema_pb.add_column();
column_1->set_unique_id(1);
column_1->set_name("c1");
column_1->set_type("INT");
column_1->set_is_key(true);
column_1->set_length(4);
column_1->set_index_length(4);
column_1->set_is_nullable(false);
column_1->set_is_bf_column(false);

ColumnPB* column_2 = tablet_schema_pb.add_column();
column_2->set_unique_id(2);
column_2->set_name("c2");
column_2->set_type("INT");
column_2->set_length(4);
column_2->set_index_length(4);
column_2->set_is_nullable(true);
column_2->set_is_key(false);
column_2->set_is_nullable(false);
column_2->set_is_bf_column(false);

// unique table must contains the DELETE_SIGN column
if (keys_type == UNIQUE_KEYS) {
ColumnPB* column_3 = tablet_schema_pb.add_column();
column_3->set_unique_id(3);
column_3->set_name(DELETE_SIGN);
column_3->set_type("TINYINT");
column_3->set_length(1);
column_3->set_index_length(1);
column_3->set_is_nullable(false);
column_3->set_is_key(false);
column_3->set_is_nullable(false);
column_3->set_is_bf_column(false);
}

tablet_schema->init_from_pb(tablet_schema_pb);

return tablet_schema;
}

TabletSchemaSPtr create_agg_schema() {
TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
TabletSchemaPB tablet_schema_pb;
Expand Down Expand Up @@ -401,7 +466,8 @@ class OrderedDataCompactionTest : public ::testing::Test {
}

private:
const std::string kTestDir = "/ut_dir/vertical_compaction_test";
const std::string kTestDir = "/ut_dir/ordered_compaction_test";
const std::string tmp_dir = "./ut_dir/ordered_compaction_test/tmp";
string absolute_dir;
std::unique_ptr<DataDir> _data_dir;
};
Expand Down Expand Up @@ -487,5 +553,75 @@ TEST_F(OrderedDataCompactionTest, test_01) {
}
}

TEST_F(OrderedDataCompactionTest, test_index_disk_size) {
auto num_input_rowset = 3;
auto num_segments = 2;
auto rows_per_segment = 50;
std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>> input_data;
generate_input_data(num_input_rowset, num_segments, rows_per_segment, input_data);

TabletSchemaSPtr tablet_schema = create_inverted_index_v1_schema();
TabletSharedPtr tablet = create_tablet(*tablet_schema, false, 10000, false);
EXPECT_TRUE(io::global_local_filesystem()->create_directory(tablet->tablet_path()).ok());

vector<RowsetSharedPtr> input_rowsets;
SegmentsOverlapPB new_overlap = NONOVERLAPPING;
for (auto i = 0; i < num_input_rowset; i++) {
RowsetWriterContext writer_context;
create_rowset_writer_context(tablet_schema, tablet->tablet_path(), new_overlap, UINT32_MAX,
&writer_context);

auto res = RowsetFactory::create_rowset_writer(*engine_ref, writer_context, false);
EXPECT_TRUE(res.has_value()) << res.error();
auto rowset_writer = std::move(res).value();

uint32_t num_rows = 0;
for (int j = 0; j < input_data[i].size(); ++j) {
vectorized::Block block = tablet_schema->create_block();
auto columns = block.mutate_columns();
for (int rid = 0; rid < input_data[i][j].size(); ++rid) {
int32_t c1 = std::get<0>(input_data[i][j][rid]);
int32_t c2 = std::get<1>(input_data[i][j][rid]);
columns[0]->insert_data((const char*)&c1, sizeof(c1));
columns[1]->insert_data((const char*)&c2, sizeof(c2));

if (tablet_schema->keys_type() == UNIQUE_KEYS) {
uint8_t num = 0;
columns[2]->insert_data((const char*)&num, sizeof(num));
}
num_rows++;
}
auto s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_TRUE(s.ok());
}

RowsetSharedPtr rowset;
EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
EXPECT_EQ(input_data[i].size(), rowset->rowset_meta()->num_segments());
EXPECT_EQ(num_rows, rowset->rowset_meta()->num_rows());

// Set random index_disk_size
rowset->rowset_meta()->set_index_disk_size(1024000000000000LL);
input_rowsets.push_back(rowset);
}

CumulativeCompaction cu_compaction(*engine_ref, tablet);
cu_compaction._input_rowsets = std::move(input_rowsets);
EXPECT_EQ(cu_compaction.handle_ordered_data_compaction(), true);

auto& out_rowset = cu_compaction._output_rowset;

// Verify the index_disk_size of the output rowset
int64_t expected_total_size = 0;
for (const auto& rowset : cu_compaction._input_rowsets) {
expected_total_size += rowset->rowset_meta()->total_disk_size();
}
std::cout << "expected_total_size: " << expected_total_size << std::endl;
std::cout << "actual_total_disk_size: " << out_rowset->rowset_meta()->total_disk_size()
<< std::endl;
EXPECT_EQ(out_rowset->rowset_meta()->total_disk_size(), expected_total_size);
}
} // namespace vectorized
} // namespace doris
4 changes: 4 additions & 0 deletions be/test/testutil/mock_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class MockRowset : public Rowset {
return Status::NotSupported("MockRowset not support this method.");
}

Status get_inverted_index_size(int64_t* index_size) override {
return Status::NotSupported("MockRowset not support this method.");
}

void clear_inverted_index_cache() override {}

Status get_segments_key_bounds(std::vector<KeyBoundsPB>* segments_key_bounds) override {
Expand Down

0 comments on commit e174ddc

Please sign in to comment.