Skip to content

Commit

Permalink
[fix](inverted index) fix rowset data size when enable index compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
csun5285 committed Dec 12, 2024
1 parent db2c4d4 commit 2045954
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 1 deletion.
59 changes: 58 additions & 1 deletion be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,9 @@ Status Compaction::do_compaction_impl(int64_t permits) {
auto& fs = _output_rowset->rowset_meta()->fs();
auto& tablet_path = _tablet->tablet_path();

// After doing index compaction, need to add this size to rowset->total_size
int64_t compacted_index_file_size = 0;

// we choose the first destination segment name as the temporary index writer path
// Used to distinguish between different index compaction
auto index_writer_path = tablet_path + "/" + dest_index_files[0];
Expand All @@ -536,7 +539,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
ctx.skip_inverted_index.cbegin(), ctx.skip_inverted_index.cend(),
[&src_segment_num, &dest_segment_num, &index_writer_path, &src_index_files,
&dest_index_files, &fs, &tablet_path, &trans_vec, &dest_segment_num_rows,
&status, this](int32_t column_uniq_id) {
&status, &compacted_index_file_size, this](int32_t column_uniq_id) {
auto error_handler = [this](int64_t index_id, int64_t column_uniq_id) {
LOG(WARNING) << "failed to do index compaction"
<< ". tablet=" << _tablet->tablet_id()
Expand Down Expand Up @@ -584,6 +587,25 @@ Status Compaction::do_compaction_impl(int64_t permits) {
error_handler(index_id, column_uniq_id);
status = Status::Error<ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>(
st.msg());
} else {
for (int i = 0; i < dest_segment_num; ++i) {
// format: rowsetId_segmentId_columnId
auto seg_path =
std::static_pointer_cast<BetaRowset>(_output_rowset)
->segment_file_path(i);
std::string index_path =
InvertedIndexDescriptor::get_index_file_name(seg_path,
index_id);
int64_t current_size = 0;
st = fs->file_size(index_path, &current_size);
if (!st.ok()) {
error_handler(index_id, column_uniq_id);
status = Status::Error<
ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>(
st.msg());
}
compacted_index_file_size += current_size;
}
}
} catch (CLuceneError& e) {
error_handler(index_id, column_uniq_id);
Expand All @@ -597,6 +619,41 @@ Status Compaction::do_compaction_impl(int64_t permits) {
return status;
}

// index compaction should update total disk size and index disk size=
_output_rowset->rowset_meta()->set_data_disk_size(
_output_rowset->rowset_meta()->data_disk_size() + compacted_index_file_size);
_output_rowset->rowset_meta()->set_total_disk_size(
_output_rowset->rowset_meta()->total_disk_size() + compacted_index_file_size);
_output_rowset->rowset_meta()->set_index_disk_size(_output_rowset->index_disk_size() +
compacted_index_file_size);

DBUG_EXECUTE_IF("check_after_compaction_file_size", {
int64_t total_file_size = 0;
for (int i = 0; i < dest_segment_num; ++i) {
auto seg_path = std::static_pointer_cast<BetaRowset>(_output_rowset)
->segment_file_path(i);
int64_t current_size = 0;
RETURN_IF_ERROR(fs->file_size(seg_path, &current_size));
total_file_size += current_size;
for (auto& column : _cur_tablet_schema->columns()) {
const TabletIndex* index_meta =
_cur_tablet_schema->get_inverted_index(column.unique_id());
if (index_meta) {
std::string index_path = InvertedIndexDescriptor::get_index_file_name(
seg_path, index_meta->index_id());
RETURN_IF_ERROR(fs->file_size(index_path, &current_size));
total_file_size += current_size;
}
}
}
if (total_file_size != _output_rowset->rowset_meta()->data_disk_size()) {
Status::Error<ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>(
"total file size {} is not equal rowset meta size {}", total_file_size,
_output_rowset->rowset_meta()->data_disk_size());
}
LOG(INFO) << "succeed to check index compaction file size";
})

LOG(INFO) << "succeed to do index compaction"
<< ". tablet=" << _tablet->full_name()
<< ", input row number=" << _input_row_num
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import org.awaitility.Awaitility

suite("test_index_compaction_rowset_size", "p0, nonConcurrent") {

def show_table_name = "test_index_compaction_rowset_size"

def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
def set_be_config = { key, value ->
for (String backend_id: backendId_to_backendIP.keySet()) {
def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
}
}
set_be_config.call("inverted_index_compaction_enable", "true")

sql "DROP TABLE IF EXISTS ${show_table_name}"
sql """
CREATE TABLE ${show_table_name} (
`@timestamp` int(11) NULL,
`clientip` varchar(20) NULL,
`request` varchar(500) NULL,
`status` int NULL,
`size` int NULL,
INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '',
INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "unicode") COMMENT '',
INDEX size_idx (`size`) USING INVERTED COMMENT '',
) ENGINE=OLAP
DUPLICATE KEY(`@timestamp`, `clientip`)
DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"compaction_policy" = "time_series",
"time_series_compaction_file_count_threshold" = "20",
"disable_auto_compaction" = "true"
);
"""

def compaction = {

def tablets = sql_return_maparray """ show tablets from ${show_table_name}; """

for (def tablet in tablets) {
int beforeSegmentCount = 0
String tablet_id = tablet.TabletId
(code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
for (String rowset in (List<String>) tabletJson.rowsets) {
beforeSegmentCount += Integer.parseInt(rowset.split(" ")[1])
}
assertEquals(beforeSegmentCount, 12)
}

// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
assertEquals("success", compactJson.status.toLowerCase())
}

// wait for all compactions done
for (def tablet in tablets) {
Awaitility.await().atMost(10, TimeUnit.MINUTES).untilAsserted(() -> {
Thread.sleep(5000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("compaction task for this tablet is not running", compactionStatus.msg.toLowerCase())
});
}


for (def tablet in tablets) {
int afterSegmentCount = 0
String tablet_id = tablet.TabletId
(code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
for (String rowset in (List<String>) tabletJson.rowsets) {
logger.info("rowset is: " + rowset)
afterSegmentCount += Integer.parseInt(rowset.split(" ")[1])
}
assertEquals(afterSegmentCount, 1)
}
}



// 1. load data
sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """
sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """
sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """
sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """
sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """
sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """
sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """
sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """
sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """
sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """
sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """
sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """

try {
GetDebugPoint().enableDebugPointForAllBEs("check_after_compaction_file_size")
// 2. compaction
compaction.call()
} finally {
GetDebugPoint().disableDebugPointForAllBEs("check_after_compaction_file_size")
}


}

0 comments on commit 2045954

Please sign in to comment.