Skip to content

Commit

Permalink
[fix](cloud-mow) schema change should retry when encouter TXN_CONFILC…
Browse files Browse the repository at this point in the history
…T in cloud mode
  • Loading branch information
hust-hhb committed Jan 10, 2025
1 parent 8405321 commit 5125477
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 1 deletion.
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res,
} else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
res->status().msg());
} else if (res->status().code() ==
MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) {
return Status::Error<ErrorCode::TXN_CONFLICT, false>("failed to {}: {}", op_name,
res->status().msg());
} else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
res->status().msg());
Expand Down
8 changes: 8 additions & 0 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,14 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
}

cloud::FinishTabletJobResponse finish_resp;
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.test_conflict", {
std::srand(static_cast<unsigned int>(std::time(nullptr)));
int random_value = std::rand() % 100;
if (random_value < 50) {
LOG(INFO) << "test txn conflict";
return Status::Error<ErrorCode::TXN_CONFLICT>("test txn conflict");
}
});
auto st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp);
if (!st.ok()) {
if (finish_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) {
Expand Down
1 change: 1 addition & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ namespace ErrorCode {
TStatusError(NOT_MASTER, true); \
TStatusError(OBTAIN_LOCK_FAILED, false); \
TStatusError(SNAPSHOT_EXPIRED, false); \
TStatusError(TXN_CONFLICT, false); \
TStatusError(DELETE_BITMAP_LOCK_ERROR, false);
// E error_name, error_code, print_stacktrace
#define APPLY_FOR_OLAP_ERROR_CODES(E) \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3306,6 +3306,14 @@ public static int metaServiceRpcRetryTimes() {
"Maximal concurrent num of get tablet stat job."})
public static int max_get_tablet_stat_task_threads_num = 4;

@ConfField(mutable = true, description = {"存算分离模式下schema change失败是否重试",
"Whether to enable retry when schema change failed in cloud model, default is true."})
public static boolean enable_schema_change_retry_in_cloud_mode = true;

@ConfField(mutable = true, description = {"存算分离模式下schema change重试次数",
"Max retry times when schema change failed in cloud model, default is 3."})
public static int schema_change_max_retry_time = 3;

// ATTN: DONOT add any config not related to cloud mode here
// ATTN: DONOT add any config not related to cloud mode here
// ATTN: DONOT add any config not related to cloud mode here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterReplicaTask;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
Expand Down Expand Up @@ -577,7 +578,14 @@ protected void runRunningJob() throws AlterCancelException {
List<AgentTask> tasks = schemaChangeBatchTask.getUnfinishedTasks(2000);
ensureCloudClusterExist(tasks);
for (AgentTask task : tasks) {
if (task.getFailedTimes() > 0) {
int maxFailedTimes = 0;
if (Config.isCloudMode() && Config.enable_schema_change_retry_in_cloud_mode) {
if (task.getErrorCode() != null && task.getErrorCode().equals(TStatusCode.TXN_CONFLICT)) {
maxFailedTimes = Config.schema_change_max_retry_time;
}
LOG.warn("schema change task failed: {}, maxFailedTimes {}", task.getErrorMsg(), maxFailedTimes);
}
if (task.getFailedTimes() > maxFailedTimes) {
task.setFinished(true);
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature());
LOG.warn("schema change task failed: {}", task.getErrorMsg());
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/Status.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ enum TStatusCode {

SNAPSHOT_EXPIRED = 75,

TXN_CONFLICT = 76,

// used for cloud
DELETE_BITMAP_LOCK_ERROR = 100,
// Not be larger than 200, see status.h
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import java.util.concurrent.TimeUnit
import org.awaitility.Awaitility

suite("test_schema_change_with_mow_txn_conflict", "p0") {
def customFeConfig = [
schema_change_max_retry_time: 10
]
setFeConfigTemporary(customFeConfig) {
try {
def tableName3 = "test_all_unique_mow"
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.test_conflict")

def getJobState = { tableName ->
def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
return jobStateResult[0][9]
}

def getCreateViewState = { tableName ->
def createViewStateResult = sql """ SHOW ALTER TABLE MATERIALIZED VIEW WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
return createViewStateResult[0][8]
}

def execStreamLoad = {
streamLoad {
table "${tableName3}"

set 'column_separator', ','

file 'all_types.csv'
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(2500, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}
}

sql """ DROP TABLE IF EXISTS ${tableName3} """

sql """
CREATE TABLE IF NOT EXISTS ${tableName3} (
`k1` int(11) NULL,
`k2` tinyint(4) NULL,
`k3` smallint(6) NULL,
`k4` int(30) NULL,
`k5` largeint(40) NULL,
`k6` float NULL,
`k7` double NULL,
`k8` decimal(9, 0) NULL,
`k9` char(10) NULL,
`k10` varchar(1024) NULL,
`k11` text NULL,
`k12` date NULL,
`k13` datetime NULL
) ENGINE=OLAP
unique KEY(k1, k2, k3)
DISTRIBUTED BY HASH(`k1`) BUCKETS 5
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true"
);
"""
execStreamLoad()

sql """ alter table ${tableName3} modify column k4 string NULL"""

Awaitility.await().atMost(600, TimeUnit.SECONDS).pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(
{
String res = getJobState(tableName3)
if (res == "FINISHED" || res == "CANCELLED") {
assertEquals("FINISHED", res)
return true
}
execStreamLoad()
return false
}
)

sql """ alter table ${tableName3} add column v14 int NOT NULL default "1" after k13 """
Awaitility.await().atMost(60, TimeUnit.SECONDS).pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(
{
String res = getJobState(tableName3)
if (res == "FINISHED" || res == "CANCELLED") {
assertEquals("FINISHED", res)
return true
}
execStreamLoad()
return false
}
)
sql """ insert into ${tableName3} values (10001, 2, 3, 4, 5, 6.6, 1.7, 8.8,
'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00', 10086) """
//
// sql """ alter table ${tableName3} modify column k5 string NULL"""
// Awaitility.await().atMost(60, TimeUnit.SECONDS).pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(
// {
// String res = getJobState(tableName3)
// if (res == "FINISHED" || res == "CANCELLED") {
// assertEquals("FINISHED", res)
// return true
// }
// execStreamLoad()
// return false
// }
// )
//
// sql """ alter table ${tableName3} add column v14 int NOT NULL default "1" after k13 """
// sql """ insert into ${tableName3} values (10001, 2, 3, 4, 5, 6.6, 1.7, 8.8,
// 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00', 10086) """
//
// sql """ alter table ${tableName3} modify column v14 int NULL default "1" """
//
// int cnt = 6000
// Awaitility.await().atMost(60, TimeUnit.SECONDS).pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(
// {
// String res = getJobState(tableName3)
// if (res == "FINISHED" || res == "CANCELLED") {
// assertEquals("FINISHED", res)
// return true
// }
// cnt--;
// int val = 100000 + cnt
// sql """ insert into ${tableName3} values (${val}, 2, 3, 4, 5, 6.6, 1.7, 8.8,
// 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00', 9527) """
// return false
// }
// )
//
// sql """ alter table ${tableName3} drop column v14 """
// execStreamLoad()
//
// sql """ alter table ${tableName3} add column v14 int NOT NULL default "1" after k13 """
//
// sql """ insert into ${tableName3} values (10002, 2, 3, 4, 5, 6.6, 1.7, 8.8,
// 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00', 10086) """
//
// sql """ alter table ${tableName3} drop column v14 """
//
// sql """ alter table ${tableName3} add column v14 bitmap after k13 """
//
// sql """ insert into ${tableName3} values (10002, 2, 3, 4, 5, 6.6, 1.7, 8.8,
// 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00', to_bitmap(243)) """
//
// sql """ alter table ${tableName3} drop column v14 """
//
List<List<Object>> result = sql """ select * from ${tableName3} """
for (row : result) {
logger.info("r1=" +row[1] )
logger.info("r2=" +row[2] )
logger.info("r3=" +row[3] )
logger.info("r4=" +row[4] )
assertEquals(2, row[1]);
assertEquals(3, row[2]);
assertEquals("4", row[3]);
assertEquals("5", row[4]);
}
} finally {
GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.test_conflict")
}
}


}

0 comments on commit 5125477

Please sign in to comment.