Skip to content

Commit

Permalink
update routine load cases
Browse files Browse the repository at this point in the history
  • Loading branch information
XuJianxu committed Nov 5, 2024
1 parent b130340 commit 013f881
Showing 1 changed file with 0 additions and 198 deletions.
198 changes: 0 additions & 198 deletions regression-test/suites/load_p0/routine_load/test_routine_load.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -1402,88 +1402,6 @@ suite("test_routine_load","p0") {
}
}


// TODO: need update kafka script
// i = 0
// if (enabled != null && enabled.equalsIgnoreCase("true")) {
// try {
// for (String tableName in tables) {
// sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
// sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text

// def name = "routine_load_" + tableName
// sql """
// CREATE ROUTINE LOAD ${jobs[i]} ON ${name}
// COLUMNS(${columns[i]})
// PROPERTIES
// (
// "format" = "json",
// "strip_outer_array" = "true",
// "fuzzy_parse" = "true",
// "max_batch_interval" = "5",
// "max_batch_rows" = "300000",
// "max_batch_size" = "209715200"
// )
// FROM KAFKA
// (
// "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
// "kafka_topic" = "${jsonArrayTopic[i]}",
// "property.kafka_default_offsets" = "OFFSET_BEGINNING"
// );
// """
// sql "sync"
// i++
// }

// i = 0
// for (String tableName in tables) {
// while (true) {
// sleep(1000)
// def res = sql "show routine load for ${jobs[i]}"
// def state = res[0][8].toString()
// if (state == "NEED_SCHEDULE") {
// continue;
// }
// log.info("reason of state changed: ${res[0][17].toString()}".toString())
// assertEquals(res[0][8].toString(), "RUNNING")
// break;
// }

// def count = 0
// def tableName1 = "routine_load_" + tableName
// while (true) {
// def res = sql "select count(*) from ${tableName1}"
// def state = sql "show routine load for ${jobs[i]}"
// log.info("routine load state: ${state[0][8].toString()}".toString())
// log.info("routine load statistic: ${state[0][14].toString()}".toString())
// log.info("reason of state changed: ${state[0][17].toString()}".toString())
// if (res[0][0] > 0) {
// break
// }
// if (count >= 120) {
// log.error("routine load can not visible for long time")
// assertEquals(20, res[0][0])
// break
// }
// sleep(5000)
// count++
// }
// if (i <= 3) {
// qt_sql_json_strip_outer_array "select * from ${tableName1} order by k00,k01,k02"
// } else {
// qt_sql_json_strip_outer_array "select * from ${tableName1} order by k00,k01,k02"
// }

// sql "stop routine load for ${jobs[i]}"
// i++
// }
// } finally {
// for (String tableName in tables) {
// sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
// }
// }
// }

// multi_table
if (enabled != null && enabled.equalsIgnoreCase("true")) {
def j = 0
Expand Down Expand Up @@ -2128,121 +2046,5 @@ suite("test_routine_load","p0") {
}
}

// sequence
if (enabled != null && enabled.equalsIgnoreCase("true")) {
try {
sql new File("""${context.file.parent}/ddl/uniq_tbl_basic_drop_sequence.sql""").text
sql new File("""${context.file.parent}/ddl/uniq_tbl_basic_create_sequence.sql""").text

def name = "routine_load_uniq_tbl_basic_sequence"
def job = "sequence_job"
sql """
CREATE ROUTINE LOAD ${job} ON ${name}
COLUMNS(${columns[0]}),
COLUMNS TERMINATED BY "|"
PROPERTIES
(
"max_batch_interval" = "5",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${topics[0]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"

while (true) {
sleep(1000)
def res = sql "show routine load for ${job}"
def state = res[0][8].toString()
if (state == "NEED_SCHEDULE") {
continue;
}
log.info("reason of state changed: ${res[0][17].toString()}".toString())
assertEquals(res[0][8].toString(), "RUNNING")
break;
}

def count = 0
while (true) {
def res = sql "select count(*) from ${name}"
def state = sql "show routine load for ${job}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
qt_sql_squence "select * from routine_load_uniq_tbl_basic_sequence order by k00,k01,k02"
sql "stop routine load for ${job}"
} finally {
sql new File("""${context.file.parent}/ddl/uniq_tbl_basic_drop_sequence.sql""").text
}
}

// error command
i = 0
if (enabled != null && enabled.equalsIgnoreCase("true")) {
try {
for (String tableName in tables) {
sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text

def name = "routine_load_" + tableName
sql """
CREATE ROUTINE LOAD ${jobs[i]} ON ${name}
COLUMNS(${columns[i]},k30),
COLUMNS TERMINATED BY "|"
PROPERTIES
(
"max_batch_interval" = "5",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${topics[i]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"
i++
}

i = 0
for (String tableName in tables) {
while (true) {
sleep(1000)
def res = sql "show routine load for ${jobs[i]}"
def state = res[0][8].toString()
if (state != "PAUSED") {
continue;
}
log.info("reason of state changed: ${res[0][17].toString()}".toString())
assertEquals(res[0][17].toString(), "ErrorReason{code=errCode = 102, msg='current error rows is more than max_error_number or the max_filter_ratio is more than the value set'}")
break;
}

sql "stop routine load for ${jobs[i]}"
i++
}
} finally {
for (String tableName in tables) {
sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
}
}
}
}

0 comments on commit 013f881

Please sign in to comment.