diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 006e8a15a6ffc7..643f1837982a0f 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -53,6 +53,7 @@ #include "runtime/exec_env.h" #include "service/backend_options.h" #include "util/brpc_client_cache.h" +#include "util/debug_points.h" #include "util/mem_info.h" #include "util/ref_count_closure.h" #include "util/stopwatch.hpp" @@ -102,6 +103,7 @@ Status DeltaWriterV2::init() { return Status::OK(); } // build tablet schema in request level + DBUG_EXECUTE_IF("DeltaWriterV2.init.stream_size", { _streams.clear(); }); if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == nullptr) { return Status::InternalError("failed to find tablet schema for {}", _req.index_id); } diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index ff2d16bd1f508f..7190597b54cf1f 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -81,7 +81,7 @@ Status FlushToken::submit(std::unique_ptr mem_table) { { std::shared_lock rdlk(_flush_status_lock); DBUG_EXECUTE_IF("FlushToken.submit_flush_error", { - _flush_status = Status::IOError("dbug_be_memtable_submit_flush_error"); + _flush_status = Status::IOError("dbug_be_memtable_submit_flush_error"); }); if (!_flush_status.ok()) { return _flush_status; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index f68d82911ce8a9..4972a397f701e3 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -196,10 +196,17 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { // get table's tuple descriptor _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id); + DBUG_EXECUTE_IF("VTabletWriterV2._init._output_tuple_desc_null", + { _output_tuple_desc = nullptr; }); if (_output_tuple_desc == nullptr) { return Status::InternalError("unknown destination tuple descriptor, id = {}", _tuple_desc_id); } + DBUG_EXECUTE_IF("VTabletWriterV2._init._vec_output_expr_ctxs_not_equal_output_tuple_slot", { + return Status::InvalidArgument( + "output_tuple_slot_num {} should be equal to output_expr_num {}", + _output_tuple_desc->slots().size() + 1, _vec_output_expr_ctxs.size()); + }); if (_vec_output_expr_ctxs.size() > 0 && _output_tuple_desc->slots().size() != _vec_output_expr_ctxs.size()) { LOG(WARNING) << "output tuple slot num should be equal to num of output exprs, " @@ -264,6 +271,8 @@ Status VTabletWriterV2::_open_streams(int64_t src_id) { Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, ::doris::stream_load::LoadStreams& streams) { const auto* node_info = _nodes_info->find_node(dst_id); + DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null", + { node_info = nullptr; }); if (node_info == nullptr) { return Status::InternalError("Unknown node {} in tablet location", dst_id); } @@ -289,6 +298,8 @@ Status VTabletWriterV2::_build_tablet_node_mapping() { for (const auto& index : partition->indexes) { for (const auto& tablet_id : index.tablets) { auto tablet_location = _location->find_tablet(tablet_id); + DBUG_EXECUTE_IF("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null", + { tablet_location = nullptr; }); if (tablet_location == nullptr) { return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id); @@ -338,6 +349,7 @@ void VTabletWriterV2::_generate_rows_for_tablet(std::vector& r Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, Streams& streams) { const auto* location = _location->find_tablet(tablet_id); + DBUG_EXECUTE_IF("VTabletWriterV2._select_streams.location_null", { location = nullptr; }); if (location == nullptr) { return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id); } @@ -484,6 +496,8 @@ Status VTabletWriterV2::close(Status exec_status) { status = _send_new_partition_batch(); } + DBUG_EXECUTE_IF("VTabletWriterV2.close.cancel", + { status = Status::InternalError("load cancel"); }); if (status.ok()) { // only if status is ok can we call this _profile->total_time_counter(). // if status is not ok, this sink may not be prepared, so that _profile is null diff --git a/regression-test/pipeline/p0/conf/regression-conf.groovy b/regression-test/pipeline/p0/conf/regression-conf.groovy index b28b2e64004148..849a1298301e15 100644 --- a/regression-test/pipeline/p0/conf/regression-conf.groovy +++ b/regression-test/pipeline/p0/conf/regression-conf.groovy @@ -58,7 +58,7 @@ excludeGroups = "" excludeSuites = "test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external" // this directories will not be executed -excludeDirectories = "workload_manager_p1,fault_injection_p0,nereids_rules_p0/subquery" +excludeDirectories = "workload_manager_p1,nereids_rules_p0/subquery" customConf1 = "test_custom_conf_value" diff --git a/regression-test/suites/fault_injection_p0/test_build_index_fault.groovy b/regression-test/suites/fault_injection_p0/test_build_index_fault.groovy index 9ddf13322c58f0..99f1173acfaaa2 100644 --- a/regression-test/suites/fault_injection_p0/test_build_index_fault.groovy +++ b/regression-test/suites/fault_injection_p0/test_build_index_fault.groovy @@ -16,7 +16,7 @@ // under the License. -suite("test_build_index_fault", "inverted_index"){ +suite("test_build_index_fault", "inverted_index, nonConcurrent"){ // prepare test table def timeout = 60000 def delta_time = 1000 diff --git a/regression-test/suites/fault_injection_p0/test_mem_gc_when_load_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_mem_gc_when_load_fault_injection.groovy index 99ae467ff4d246..852566c0e05fb3 100644 --- a/regression-test/suites/fault_injection_p0/test_mem_gc_when_load_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_mem_gc_when_load_fault_injection.groovy @@ -18,7 +18,7 @@ import org.codehaus.groovy.runtime.IOGroovyMethods import org.apache.doris.regression.util.Http -suite("mem_gc_when_load") { +suite("mem_gc_when_load", "nonConcurrent") { // init query case data sql """ CREATE TABLE IF NOT EXISTS `baseall` ( diff --git a/regression-test/suites/fault_injection_p0/test_memtable_flush_fault.groovy b/regression-test/suites/fault_injection_p0/test_memtable_flush_fault.groovy index e1a4a1a3a76a71..a1a7be381ec164 100644 --- a/regression-test/suites/fault_injection_p0/test_memtable_flush_fault.groovy +++ b/regression-test/suites/fault_injection_p0/test_memtable_flush_fault.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_memtable_flush_fault") { +suite("test_memtable_flush_fault", "nonConcurrent") { GetDebugPoint().clearDebugPointsForAllBEs() def testTable = "test_memtable_flush_fault" sql """ DROP TABLE IF EXISTS ${testTable}""" diff --git a/regression-test/suites/fault_injection_p0/test_segcompaction_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_segcompaction_fault_injection.groovy index a4ded1a58e0f3d..a850111dee85fa 100644 --- a/regression-test/suites/fault_injection_p0/test_segcompaction_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_segcompaction_fault_injection.groovy @@ -18,26 +18,25 @@ import org.codehaus.groovy.runtime.IOGroovyMethods import org.apache.doris.regression.util.Http -def tableName = "segcompaction_correctness_test" -def create_table_sql = """ - CREATE TABLE IF NOT EXISTS ${tableName} ( - `col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20), - `col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20), - `col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20), - `col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20), - `col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20), - `col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20), - `col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20), - `col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20), - `col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20), - `col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20) - ) - DUPLICATE KEY(`col_0`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1 - PROPERTIES ( "replication_num" = "1" ); - """ -def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49" - suite("test_segcompaction_correctness", "nonConcurrent") { + def tableName = "segcompaction_correctness_test" + def create_table_sql = """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20), + `col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20), + `col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20), + `col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20), + `col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20), + `col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20), + `col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20), + `col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20), + `col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20), + `col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20) + ) + DUPLICATE KEY(`col_0`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1 + PROPERTIES ( "replication_num" = "1" ); + """ + def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49" def runLoadWithSegcompaction = { String ak = getS3AK() String sk = getS3SK() diff --git a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy new file mode 100644 index 00000000000000..e6e5758b2b3a6a --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.doris.regression.util.Http + +suite("test_writer_v2_fault_injection", "nonConcurrent") { + sql """ set enable_memtable_on_sink_node=true """ + sql """ + CREATE TABLE IF NOT EXISTS `baseall` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + sql """ + CREATE TABLE IF NOT EXISTS `test` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace_if_not_null null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + + GetDebugPoint().clearDebugPointsForAllBEs() + streamLoad { + table "baseall" + db "regression_test_fault_injection_p0" + set 'column_separator', ',' + file "baseall.txt" + } + + def load_with_injection = { injection, error_msg-> + try { + GetDebugPoint().enableDebugPointForAllBEs(injection) + sql "insert into test select * from baseall where k1 <= 3" + } catch(Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains(error_msg)) + } finally { + GetDebugPoint().disableDebugPointForAllBEs(injection) + } + } + + // VTabletWriterV2 _output_tuple_desc is null + load_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "unknown destination tuple descriptor") + // VTabletWriterV2 _vec_output_expr_ctxs not equal _output_tuple_slot + load_with_injection("VTabletWriterV2._init._vec_output_expr_ctxs_not_equal_output_tuple_slot", "should be equal to output_expr_num") + // VTabletWriterV2 node_info is null + load_with_injection("VTabletWriterV2._open_streams_to_backend.node_info_null", "Unknown node") + // VTabletWriterV2 tablet_location is null + load_with_injection("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null", "unknown tablet location") + // VTabletWriterV2 location is null + load_with_injection("VTabletWriterV2._select_streams.location_null", "unknown tablet location") + // VTabletWriterV2 cancel + load_with_injection("VTabletWriterV2.close.cancel", "load cancel") + // DeltaWriterV2 stream_size is 0 + load_with_injection("DeltaWriterV2.init.stream_size", "failed to find tablet schema") + + sql """ set enable_memtable_on_sink_node=false """ +} \ No newline at end of file