diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 2b5f04550cfe2a..ff2c507726031d 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -1070,7 +1070,12 @@ Status CloudMetaMgr::commit_tablet_job(const TabletJobInfoPB& job, FinishTabletJ req.mutable_job()->CopyFrom(job); req.set_action(FinishTabletJobRequest::COMMIT); req.set_cloud_unique_id(config::cloud_unique_id); - return retry_rpc("commit tablet job", req, res, &MetaService_Stub::finish_tablet_job); + auto st = retry_rpc("commit tablet job", req, res, &MetaService_Stub::finish_tablet_job); + if (res->status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) { + return Status::Error( + "txn conflict when commit tablet job {}", job.ShortDebugString()); + } + return st; } Status CloudMetaMgr::abort_tablet_job(const TabletJobInfoPB& job) { @@ -1205,6 +1210,17 @@ Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, in << "ms : " << res.status().msg(); bthread_usleep(duration_ms * 1000); } while (++retry_times <= 100); + if (res.status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) { + return Status::Error( + "txn conflict when get delete bitmap update lock, table_id {}, lock_id {}, " + "initiator {}", + tablet.table_id(), lock_id, initiator); + } else if (res.status().code() == MetaServiceCode::LOCK_CONFLICT) { + return Status::Error( + "lock conflict when get delete bitmap update lock, table_id {}, lock_id {}, " + "initiator {}", + tablet.table_id(), lock_id, initiator); + } return st; } diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 14781da0f37f97..81b07ff7e4c843 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -392,6 +392,13 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam } cloud::FinishTabletJobResponse finish_resp; + DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.test_conflict", { + std::srand(static_cast(std::time(nullptr))); + int random_value = std::rand() % 100; + if (random_value < 50) { + return Status::Error("test txn conflict"); + } + }); auto st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp); if (!st.ok()) { if (finish_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) { diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index a8b0bd15e84ce0..4a6833455560b3 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3306,6 +3306,14 @@ public static int metaServiceRpcRetryTimes() { "Maximal concurrent num of get tablet stat job."}) public static int max_get_tablet_stat_task_threads_num = 4; + @ConfField(mutable = true, description = {"存算分离模式下schema change失败是否重试", + "Whether to enable retry when schema change failed in cloud model, default is true."}) + public static boolean enable_schema_change_retry_in_cloud_mode = true; + + @ConfField(mutable = true, description = {"存算分离模式下schema change重试次数", + "Max retry times when schema change failed in cloud model, default is 3."}) + public static int schema_change_max_retry_time = 3; + // ATTN: DONOT add any config not related to cloud mode here // ATTN: DONOT add any config not related to cloud mode here // ATTN: DONOT add any config not related to cloud mode here diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index bea703a29d75fa..bc5b8c5031c79a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -56,6 +56,7 @@ import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterReplicaTask; import org.apache.doris.task.CreateReplicaTask; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -577,7 +578,16 @@ protected void runRunningJob() throws AlterCancelException { List tasks = schemaChangeBatchTask.getUnfinishedTasks(2000); ensureCloudClusterExist(tasks); for (AgentTask task : tasks) { - if (task.getFailedTimes() > 0) { + int maxFailedTimes = 0; + if (Config.isCloudMode() && Config.enable_schema_change_retry_in_cloud_mode) { + if (task.getErrorCode() != null && task.getErrorCode() + .equals(TStatusCode.DELETE_BITMAP_LOCK_ERROR)) { + maxFailedTimes = Config.schema_change_max_retry_time; + LOG.warn("schema change task failed: {}, set maxFailedTimes {}", task.getErrorMsg(), + maxFailedTimes); + } + } + if (task.getFailedTimes() > maxFailedTimes) { task.setFinished(true); AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature()); LOG.warn("schema change task failed: {}", task.getErrorMsg()); diff --git a/regression-test/suites/schema_change_p0/test_schema_change_with_mow_txn_conflict.groovy b/regression-test/suites/schema_change_p0/test_schema_change_with_mow_txn_conflict.groovy new file mode 100644 index 00000000000000..448ae3bc0fbea1 --- /dev/null +++ b/regression-test/suites/schema_change_p0/test_schema_change_with_mow_txn_conflict.groovy @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_schema_change_with_mow_txn_conflict", "p0") { + def customFeConfig = [ + schema_change_max_retry_time: 10 + ] + setFeConfigTemporary(customFeConfig) { + try { + def tableName3 = "test_all_unique_mow" + GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.test_conflict") + + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def getCreateViewState = { tableName -> + def createViewStateResult = sql """ SHOW ALTER TABLE MATERIALIZED VIEW WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return createViewStateResult[0][8] + } + + def execStreamLoad = { + streamLoad { + table "${tableName3}" + + set 'column_separator', ',' + + file 'all_types.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(2500, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + } + + sql """ DROP TABLE IF EXISTS ${tableName3} """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName3} ( + `k1` int(11) NULL, + `k2` tinyint(4) NULL, + `k3` smallint(6) NULL, + `k4` int(30) NULL, + `k5` largeint(40) NULL, + `k6` float NULL, + `k7` double NULL, + `k8` decimal(9, 0) NULL, + `k9` char(10) NULL, + `k10` varchar(1024) NULL, + `k11` text NULL, + `k12` date NULL, + `k13` datetime NULL + ) ENGINE=OLAP + unique KEY(k1, k2, k3) + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + execStreamLoad() + + sql """ alter table ${tableName3} modify column k4 string NULL""" + + Awaitility.await().atMost(600, TimeUnit.SECONDS).pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until( + { + String res = getJobState(tableName3) + if (res == "FINISHED" || res == "CANCELLED") { + assertEquals("FINISHED", res) + return true + } + execStreamLoad() + return false + } + ) + + sql """ alter table ${tableName3} add column v14 int NOT NULL default "1" after k13 """ + Awaitility.await().atMost(60, TimeUnit.SECONDS).pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until( + { + String res = getJobState(tableName3) + if (res == "FINISHED" || res == "CANCELLED") { + assertEquals("FINISHED", res) + return true + } + execStreamLoad() + return false + } + ) + sql """ insert into ${tableName3} values (10001, 2, 3, 4, 5, 6.6, 1.7, 8.8, + 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00', 10086) """ + List> result = sql """ select * from ${tableName3} """ + for (row : result) { + assertEquals(2, row[1]); + assertEquals(3, row[2]); + assertEquals("4", row[3]); + assertEquals("5", row[4]); + } + } finally { + GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.test_conflict") + } + } + + +} \ No newline at end of file