Skip to content

Commit

Permalink
[improve](move-memtable) add fault injection in writer v2 (#29177)
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui authored Dec 29, 2023
1 parent e62857e commit 5bc72bd
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 24 deletions.
2 changes: 2 additions & 0 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "runtime/exec_env.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
#include "util/debug_points.h"
#include "util/mem_info.h"
#include "util/ref_count_closure.h"
#include "util/stopwatch.hpp"
Expand Down Expand Up @@ -102,6 +103,7 @@ Status DeltaWriterV2::init() {
return Status::OK();
}
// build tablet schema in request level
DBUG_EXECUTE_IF("DeltaWriterV2.init.stream_size", { _streams.clear(); });
if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == nullptr) {
return Status::InternalError("failed to find tablet schema for {}", _req.index_id);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) {
{
std::shared_lock rdlk(_flush_status_lock);
DBUG_EXECUTE_IF("FlushToken.submit_flush_error", {
_flush_status = Status::IOError("dbug_be_memtable_submit_flush_error");
_flush_status = Status::IOError<false>("dbug_be_memtable_submit_flush_error");
});
if (!_flush_status.ok()) {
return _flush_status;
Expand Down
14 changes: 14 additions & 0 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,17 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {

// get table's tuple descriptor
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
DBUG_EXECUTE_IF("VTabletWriterV2._init._output_tuple_desc_null",
{ _output_tuple_desc = nullptr; });
if (_output_tuple_desc == nullptr) {
return Status::InternalError("unknown destination tuple descriptor, id = {}",
_tuple_desc_id);
}
DBUG_EXECUTE_IF("VTabletWriterV2._init._vec_output_expr_ctxs_not_equal_output_tuple_slot", {
return Status::InvalidArgument(
"output_tuple_slot_num {} should be equal to output_expr_num {}",
_output_tuple_desc->slots().size() + 1, _vec_output_expr_ctxs.size());
});
if (_vec_output_expr_ctxs.size() > 0 &&
_output_tuple_desc->slots().size() != _vec_output_expr_ctxs.size()) {
LOG(WARNING) << "output tuple slot num should be equal to num of output exprs, "
Expand Down Expand Up @@ -264,6 +271,8 @@ Status VTabletWriterV2::_open_streams(int64_t src_id) {
Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id,
::doris::stream_load::LoadStreams& streams) {
const auto* node_info = _nodes_info->find_node(dst_id);
DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null",
{ node_info = nullptr; });
if (node_info == nullptr) {
return Status::InternalError("Unknown node {} in tablet location", dst_id);
}
Expand All @@ -289,6 +298,8 @@ Status VTabletWriterV2::_build_tablet_node_mapping() {
for (const auto& index : partition->indexes) {
for (const auto& tablet_id : index.tablets) {
auto tablet_location = _location->find_tablet(tablet_id);
DBUG_EXECUTE_IF("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null",
{ tablet_location = nullptr; });
if (tablet_location == nullptr) {
return Status::InternalError("unknown tablet location, tablet id = {}",
tablet_id);
Expand Down Expand Up @@ -338,6 +349,7 @@ void VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& r
Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id,
Streams& streams) {
const auto* location = _location->find_tablet(tablet_id);
DBUG_EXECUTE_IF("VTabletWriterV2._select_streams.location_null", { location = nullptr; });
if (location == nullptr) {
return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id);
}
Expand Down Expand Up @@ -484,6 +496,8 @@ Status VTabletWriterV2::close(Status exec_status) {
status = _send_new_partition_batch();
}

DBUG_EXECUTE_IF("VTabletWriterV2.close.cancel",
{ status = Status::InternalError("load cancel"); });
if (status.ok()) {
// only if status is ok can we call this _profile->total_time_counter().
// if status is not ok, this sink may not be prepared, so that _profile is null
Expand Down
2 changes: 1 addition & 1 deletion regression-test/pipeline/p0/conf/regression-conf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ excludeGroups = ""
excludeSuites = "test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external"

// this directories will not be executed
excludeDirectories = "workload_manager_p1,fault_injection_p0,nereids_rules_p0/subquery"
excludeDirectories = "workload_manager_p1,nereids_rules_p0/subquery"

customConf1 = "test_custom_conf_value"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.


suite("test_build_index_fault", "inverted_index"){
suite("test_build_index_fault", "inverted_index, nonConcurrent"){
// prepare test table
def timeout = 60000
def delta_time = 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.codehaus.groovy.runtime.IOGroovyMethods
import org.apache.doris.regression.util.Http

suite("mem_gc_when_load") {
suite("mem_gc_when_load", "nonConcurrent") {
// init query case data
sql """
CREATE TABLE IF NOT EXISTS `baseall` (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

suite("test_memtable_flush_fault") {
suite("test_memtable_flush_fault", "nonConcurrent") {
GetDebugPoint().clearDebugPointsForAllBEs()
def testTable = "test_memtable_flush_fault"
sql """ DROP TABLE IF EXISTS ${testTable}"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,25 @@
import org.codehaus.groovy.runtime.IOGroovyMethods
import org.apache.doris.regression.util.Http

def tableName = "segcompaction_correctness_test"
def create_table_sql = """
CREATE TABLE IF NOT EXISTS ${tableName} (
`col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20),
`col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20),
`col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20),
`col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20),
`col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20),
`col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20),
`col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20),
`col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20),
`col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20),
`col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20)
)
DUPLICATE KEY(`col_0`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1
PROPERTIES ( "replication_num" = "1" );
"""
def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49"

suite("test_segcompaction_correctness", "nonConcurrent") {
def tableName = "segcompaction_correctness_test"
def create_table_sql = """
CREATE TABLE IF NOT EXISTS ${tableName} (
`col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20),
`col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20),
`col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20),
`col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20),
`col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20),
`col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20),
`col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20),
`col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20),
`col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20),
`col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20)
)
DUPLICATE KEY(`col_0`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1
PROPERTIES ( "replication_num" = "1" );
"""
def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49"
def runLoadWithSegcompaction = {
String ak = getS3AK()
String sk = getS3SK()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods
import org.apache.doris.regression.util.Http

suite("test_writer_v2_fault_injection", "nonConcurrent") {
sql """ set enable_memtable_on_sink_node=true """
sql """
CREATE TABLE IF NOT EXISTS `baseall` (
`k0` boolean null comment "",
`k1` tinyint(4) null comment "",
`k2` smallint(6) null comment "",
`k3` int(11) null comment "",
`k4` bigint(20) null comment "",
`k5` decimal(9, 3) null comment "",
`k6` char(5) null comment "",
`k10` date null comment "",
`k11` datetime null comment "",
`k7` varchar(20) null comment "",
`k8` double max null comment "",
`k9` float sum null comment "",
`k12` string replace null comment "",
`k13` largeint(40) replace null comment ""
) engine=olap
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
"""
sql """
CREATE TABLE IF NOT EXISTS `test` (
`k0` boolean null comment "",
`k1` tinyint(4) null comment "",
`k2` smallint(6) null comment "",
`k3` int(11) null comment "",
`k4` bigint(20) null comment "",
`k5` decimal(9, 3) null comment "",
`k6` char(5) null comment "",
`k10` date null comment "",
`k11` datetime null comment "",
`k7` varchar(20) null comment "",
`k8` double max null comment "",
`k9` float sum null comment "",
`k12` string replace_if_not_null null comment "",
`k13` largeint(40) replace null comment ""
) engine=olap
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
"""

GetDebugPoint().clearDebugPointsForAllBEs()
streamLoad {
table "baseall"
db "regression_test_fault_injection_p0"
set 'column_separator', ','
file "baseall.txt"
}

def load_with_injection = { injection, error_msg->
try {
GetDebugPoint().enableDebugPointForAllBEs(injection)
sql "insert into test select * from baseall where k1 <= 3"
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains(error_msg))
} finally {
GetDebugPoint().disableDebugPointForAllBEs(injection)
}
}

// VTabletWriterV2 _output_tuple_desc is null
load_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "unknown destination tuple descriptor")
// VTabletWriterV2 _vec_output_expr_ctxs not equal _output_tuple_slot
load_with_injection("VTabletWriterV2._init._vec_output_expr_ctxs_not_equal_output_tuple_slot", "should be equal to output_expr_num")
// VTabletWriterV2 node_info is null
load_with_injection("VTabletWriterV2._open_streams_to_backend.node_info_null", "Unknown node")
// VTabletWriterV2 tablet_location is null
load_with_injection("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null", "unknown tablet location")
// VTabletWriterV2 location is null
load_with_injection("VTabletWriterV2._select_streams.location_null", "unknown tablet location")
// VTabletWriterV2 cancel
load_with_injection("VTabletWriterV2.close.cancel", "load cancel")
// DeltaWriterV2 stream_size is 0
load_with_injection("DeltaWriterV2.init.stream_size", "failed to find tablet schema")

sql """ set enable_memtable_on_sink_node=false """
}

0 comments on commit 5bc72bd

Please sign in to comment.