From 5d9d40f171f7b0ed64f42c8e4a0bf154e3e0c453 Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Wed, 27 Mar 2024 16:05:52 +0800 Subject: [PATCH] Add test Signed-off-by: PengFei Li --- .../flink/row/source/ArrowFieldConverter.java | 2 +- .../it/source/StarRocksSourceITTest.java | 171 +++++++++++++++++- .../source/StarRocksSourceFlinkRowsTest.java | 2 +- 3 files changed, 171 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/starrocks/connector/flink/row/source/ArrowFieldConverter.java b/src/main/java/com/starrocks/connector/flink/row/source/ArrowFieldConverter.java index 464b08fa..c1e50e5c 100644 --- a/src/main/java/com/starrocks/connector/flink/row/source/ArrowFieldConverter.java +++ b/src/main/java/com/starrocks/connector/flink/row/source/ArrowFieldConverter.java @@ -476,7 +476,7 @@ static ArrowFieldConverter createConverter(LogicalType flinkType, Field arrowFie ArrowFieldConverter keyConverter = createConverter( mapType.getKeyType(), structField.getChildren().get(0)); ArrowFieldConverter valueConverter = createConverter( - mapType.getKeyType(), structField.getChildren().get(1)); + mapType.getValueType(), structField.getChildren().get(1)); return new MapConverter(mapType.isNullable(), keyConverter, valueConverter); default: throw new UnsupportedOperationException("Unsupported type " + flinkType); diff --git a/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceITTest.java b/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceITTest.java index 24c9cfc0..1c17927f 100644 --- a/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceITTest.java @@ -32,7 +32,9 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @@ -40,7 +42,7 @@ public class StarRocksSourceITTest extends StarRocksITTestBase { @Test public void testArrayType() throws Exception { - String tableName = createComplexTypeTable("testComplexType"); + String tableName = createArrayTypeTable("testArrayType"); executeSrSQL(String.format("INSERT INTO `%s`.`%s` VALUES (%s)", DB_NAME, tableName, "0, [true], [2], [3], [4], [5], [6.6], [7.7], [8.8], ['2024-03-22'], ['2024-03-22 12:00:00'], ['11'], ['12'], " + "[[true], [true]], [[2], [2]], [[3], [3]], [[4], [4]], [[5], [5]], [[6.6], [6.6]], " + @@ -128,7 +130,7 @@ public void testArrayType() throws Exception { assertThat(results).containsExactlyInAnyOrderElementsOf(Collections.singleton(row)); } - private String createComplexTypeTable(String tablePrefix) throws Exception { + private String createArrayTypeTable(String tablePrefix) throws Exception { String tableName = tablePrefix + "_" + genRandomUuid(); String createStarRocksTable = String.format( @@ -168,4 +170,169 @@ private String createComplexTypeTable(String tablePrefix) throws Exception { executeSrSQL(createStarRocksTable); return tableName; } + + @Test + public void testStructType() throws Exception { + String tableName = createStructTypeTable("testStructType"); + executeSrSQL(String.format("INSERT INTO `%s`.`%s` VALUES (%s)", DB_NAME, tableName, + "1, row(1, '1'), row('2024-03-27', 8.9)" + ) + ); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String createSrcSQL = "CREATE TABLE sr_src(" + + "c0 INT," + + "c1 ROW," + + "c2 ROW" + + ") WITH ( " + + "'connector' = 'starrocks'," + + "'jdbc-url'='" + getJdbcUrl() + "'," + + "'scan-url'='" + String.join(";", getHttpUrls()) + "'," + + "'database-name' = '" + DB_NAME + "'," + + "'table-name' = '" + tableName + "'," + + "'username' = 'root'," + + "'password' = ''" + + ")"; + tEnv.executeSql(createSrcSQL); + List results = + CollectionUtil.iteratorToList( + tEnv.executeSql("SELECT * FROM sr_src").collect()); + Row row = Row.of( + 1, + Row.of(1, "1"), + Row.of(LocalDate.of(2024, 3, 27), new BigDecimal("8.9000000000")) + ); + assertThat(results).containsExactlyInAnyOrderElementsOf(Collections.singleton(row)); + + } + + private String createStructTypeTable(String tablePrefix) throws Exception { + String tableName = tablePrefix + "_" + genRandomUuid(); + String createStarRocksTable = + String.format( + "CREATE TABLE `%s`.`%s` (" + + "c0 INT," + + "c1 STRUCT," + + "c2 STRUCT" + + ") ENGINE = OLAP " + + "DUPLICATE KEY(c0) " + + "DISTRIBUTED BY HASH (c0) BUCKETS 8 " + + "PROPERTIES (" + + "\"replication_num\" = \"1\"" + + ")", + DB_NAME, tableName); + executeSrSQL(createStarRocksTable); + return tableName; + } + + @Test + public void testMapType() throws Exception { + String tableName = createMapTypeTable("testMapType"); + executeSrSQL(String.format("INSERT INTO `%s`.`%s` VALUES (%s)", DB_NAME, tableName, + "1, map{1:'1'}, map{'2024-03-27':8.9}" + ) + ); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String createSrcSQL = "CREATE TABLE sr_src(" + + "c0 INT," + + "c1 MAP," + + "c2 MAP" + + ") WITH ( " + + "'connector' = 'starrocks'," + + "'jdbc-url'='" + getJdbcUrl() + "'," + + "'scan-url'='" + String.join(";", getHttpUrls()) + "'," + + "'database-name' = '" + DB_NAME + "'," + + "'table-name' = '" + tableName + "'," + + "'username' = 'root'," + + "'password' = ''" + + ")"; + tEnv.executeSql(createSrcSQL); + List results = + CollectionUtil.iteratorToList( + tEnv.executeSql("SELECT * FROM sr_src").collect()); + Map c1 = new HashMap<>(); + c1.put(1, "1"); + Map c2 = new HashMap<>(); + c2.put(LocalDate.of(2024, 3, 27), new BigDecimal("8.9000000000")); + Row row = Row.of(1, c1, c2); + assertThat(results).containsExactlyInAnyOrderElementsOf(Collections.singleton(row)); + } + + private String createMapTypeTable(String tablePrefix) throws Exception { + String tableName = tablePrefix + "_" + genRandomUuid(); + String createStarRocksTable = + String.format( + "CREATE TABLE `%s`.`%s` (" + + "c0 INT," + + "c1 MAP," + + "c2 MAP" + + ") ENGINE = OLAP " + + "DUPLICATE KEY(c0) " + + "DISTRIBUTED BY HASH (c0) BUCKETS 8 " + + "PROPERTIES (" + + "\"replication_num\" = \"1\"" + + ")", + DB_NAME, tableName); + executeSrSQL(createStarRocksTable); + return tableName; + } + + @Test + public void testNestedType() throws Exception { + String tableName = createNestedTypeTable("testNestedType"); + executeSrSQL(String.format("INSERT INTO `%s`.`%s` VALUES (%s)", DB_NAME, tableName, + "1, row(1, [1, 11, 111], map{1:'1'})" + ) + ); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String createSrcSQL = "CREATE TABLE sr_src(" + + "c0 INT," + + "c1 ROW, c MAP>" + + ") WITH ( " + + "'connector' = 'starrocks'," + + "'jdbc-url'='" + getJdbcUrl() + "'," + + "'scan-url'='" + String.join(";", getHttpUrls()) + "'," + + "'database-name' = '" + DB_NAME + "'," + + "'table-name' = '" + tableName + "'," + + "'username' = 'root'," + + "'password' = ''" + + ")"; + tEnv.executeSql(createSrcSQL); + List results = + CollectionUtil.iteratorToList( + tEnv.executeSql("SELECT * FROM sr_src").collect()); + Map c = new HashMap<>(); + c.put(1, "1"); + Row row = Row.of(1, Row.of(1, new Integer[] {1, 11, 111}, c)); + assertThat(results).containsExactlyInAnyOrderElementsOf(Collections.singleton(row)); + } + + private String createNestedTypeTable(String tablePrefix) throws Exception { + String tableName = tablePrefix + "_" + genRandomUuid(); + String createStarRocksTable = + String.format( + "CREATE TABLE `%s`.`%s` (" + + "c0 INT," + + "c1 STRUCT, c MAP>" + + ") ENGINE = OLAP " + + "DUPLICATE KEY(c0) " + + "DISTRIBUTED BY HASH (c0) BUCKETS 8 " + + "PROPERTIES (" + + "\"replication_num\" = \"1\"" + + ")", + DB_NAME, tableName); + executeSrSQL(createStarRocksTable); + return tableName; + } } diff --git a/src/test/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRowsTest.java b/src/test/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRowsTest.java index 49b49d0e..e6e3bafd 100644 --- a/src/test/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRowsTest.java +++ b/src/test/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRowsTest.java @@ -142,7 +142,7 @@ public void checkFlinkRows(StarRocksSourceFlinkRows flinkRows) { assertTrue(currentObj instanceof DecimalData); DecimalData cur = (DecimalData)currentObj; assertTrue(cur.toUnscaledLong() == -3141291000L); - assertTrue(cur.precision() == 10); + assertTrue(cur.precision() == 27); assertTrue(cur.scale() == 9); } }