diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index d7f28436800fef..b4ace1f5170595 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -524,6 +524,9 @@ Status Compaction::do_compaction_impl(int64_t permits) { auto& fs = _output_rowset->rowset_meta()->fs(); auto& tablet_path = _tablet->tablet_path(); + // After doing index compaction, need to add this size to rowset->total_size + int64_t compacted_index_file_size = 0; + // we choose the first destination segment name as the temporary index writer path // Used to distinguish between different index compaction auto index_writer_path = tablet_path + "/" + dest_index_files[0]; @@ -536,7 +539,7 @@ Status Compaction::do_compaction_impl(int64_t permits) { ctx.skip_inverted_index.cbegin(), ctx.skip_inverted_index.cend(), [&src_segment_num, &dest_segment_num, &index_writer_path, &src_index_files, &dest_index_files, &fs, &tablet_path, &trans_vec, &dest_segment_num_rows, - &status, this](int32_t column_uniq_id) { + &status, &compacted_index_file_size, this](int32_t column_uniq_id) { auto error_handler = [this](int64_t index_id, int64_t column_uniq_id) { LOG(WARNING) << "failed to do index compaction" << ". tablet=" << _tablet->tablet_id() @@ -584,6 +587,25 @@ Status Compaction::do_compaction_impl(int64_t permits) { error_handler(index_id, column_uniq_id); status = Status::Error( st.msg()); + } else { + for (int i = 0; i < dest_segment_num; ++i) { + // format: rowsetId_segmentId_columnId + auto seg_path = + std::static_pointer_cast(_output_rowset) + ->segment_file_path(i); + std::string index_path = + InvertedIndexDescriptor::get_index_file_name(seg_path, + index_id); + int64_t current_size = 0; + st = fs->file_size(index_path, ¤t_size); + if (!st.ok()) { + error_handler(index_id, column_uniq_id); + status = Status::Error< + ErrorCode::INVERTED_INDEX_COMPACTION_ERROR>( + st.msg()); + } + compacted_index_file_size += current_size; + } } } catch (CLuceneError& e) { error_handler(index_id, column_uniq_id); @@ -597,6 +619,41 @@ Status Compaction::do_compaction_impl(int64_t permits) { return status; } + // index compaction should update total disk size and index disk size= + _output_rowset->rowset_meta()->set_data_disk_size( + _output_rowset->rowset_meta()->data_disk_size() + compacted_index_file_size); + _output_rowset->rowset_meta()->set_total_disk_size( + _output_rowset->rowset_meta()->total_disk_size() + compacted_index_file_size); + _output_rowset->rowset_meta()->set_index_disk_size(_output_rowset->index_disk_size() + + compacted_index_file_size); + + DBUG_EXECUTE_IF("check_after_compaction_file_size", { + int64_t total_file_size = 0; + for (int i = 0; i < dest_segment_num; ++i) { + auto seg_path = std::static_pointer_cast(_output_rowset) + ->segment_file_path(i); + int64_t current_size = 0; + RETURN_IF_ERROR(fs->file_size(seg_path, ¤t_size)); + total_file_size += current_size; + for (auto& column : _cur_tablet_schema->columns()) { + const TabletIndex* index_meta = + _cur_tablet_schema->get_inverted_index(column.unique_id()); + if (index_meta) { + std::string index_path = InvertedIndexDescriptor::get_index_file_name( + seg_path, index_meta->index_id()); + RETURN_IF_ERROR(fs->file_size(index_path, ¤t_size)); + total_file_size += current_size; + } + } + } + if (total_file_size != _output_rowset->rowset_meta()->data_disk_size()) { + Status::Error( + "total file size {} is not equal rowset meta size {}", total_file_size, + _output_rowset->rowset_meta()->data_disk_size()); + } + LOG(INFO) << "succeed to check index compaction file size"; + }) + LOG(INFO) << "succeed to do index compaction" << ". tablet=" << _tablet->full_name() << ", input row number=" << _input_row_num diff --git a/regression-test/suites/fault_injection_p0/test_index_compaction_rowset_size.groovy b/regression-test/suites/fault_injection_p0/test_index_compaction_rowset_size.groovy new file mode 100644 index 00000000000000..9dd3b1fa09a7a1 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_index_compaction_rowset_size.groovy @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_index_compaction_rowset_size", "p0, nonConcurrent") { + + def show_table_name = "test_index_compaction_rowset_size" + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + def set_be_config = { key, value -> + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + set_be_config.call("inverted_index_compaction_enable", "true") + + sql "DROP TABLE IF EXISTS ${show_table_name}" + sql """ + CREATE TABLE ${show_table_name} ( + `@timestamp` int(11) NULL, + `clientip` varchar(20) NULL, + `request` varchar(500) NULL, + `status` int NULL, + `size` int NULL, + INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '', + INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "unicode") COMMENT '', + INDEX size_idx (`size`) USING INVERTED COMMENT '', + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`, `clientip`) + DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "compaction_policy" = "time_series", + "time_series_compaction_file_count_threshold" = "20", + "disable_auto_compaction" = "true" + ); + """ + + def compaction = { + + def tablets = sql_return_maparray """ show tablets from ${show_table_name}; """ + + for (def tablet in tablets) { + int beforeSegmentCount = 0 + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + for (String rowset in (List) tabletJson.rowsets) { + beforeSegmentCount += Integer.parseInt(rowset.split(" ")[1]) + } + assertEquals(beforeSegmentCount, 12) + } + + // trigger compactions for all tablets in ${tableName} + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + assertEquals("success", compactJson.status.toLowerCase()) + } + + // wait for all compactions done + for (def tablet in tablets) { + Awaitility.await().atMost(10, TimeUnit.MINUTES).untilAsserted(() -> { + Thread.sleep(5000) + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("compaction task for this tablet is not running", compactionStatus.msg.toLowerCase()) + }); + } + + + for (def tablet in tablets) { + int afterSegmentCount = 0 + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + for (String rowset in (List) tabletJson.rowsets) { + logger.info("rowset is: " + rowset) + afterSegmentCount += Integer.parseInt(rowset.split(" ")[1]) + } + assertEquals(afterSegmentCount, 1) + } + } + + + + // 1. load data + sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """ + sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """ + sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """ + sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """ + sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """ + sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """ + sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """ + sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """ + sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """ + sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """ + sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """ + sql """ INSERT INTO ${show_table_name} VALUES (100, "andy", "andy love apple", 100, 200); """ + + try { + GetDebugPoint().enableDebugPointForAllBEs("check_after_compaction_file_size") + // 2. compaction + compaction.call() + } finally { + GetDebugPoint().disableDebugPointForAllBEs("check_after_compaction_file_size") + } + + +}