Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Mar 27, 2024
1 parent 706500c commit 5d9d40f
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ static ArrowFieldConverter createConverter(LogicalType flinkType, Field arrowFie
ArrowFieldConverter keyConverter = createConverter(
mapType.getKeyType(), structField.getChildren().get(0));
ArrowFieldConverter valueConverter = createConverter(
mapType.getKeyType(), structField.getChildren().get(1));
mapType.getValueType(), structField.getChildren().get(1));
return new MapConverter(mapType.isNullable(), keyConverter, valueConverter);
default:
throw new UnsupportedOperationException("Unsupported type " + flinkType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

public class StarRocksSourceITTest extends StarRocksITTestBase {

@Test
public void testArrayType() throws Exception {
String tableName = createComplexTypeTable("testComplexType");
String tableName = createArrayTypeTable("testArrayType");
executeSrSQL(String.format("INSERT INTO `%s`.`%s` VALUES (%s)", DB_NAME, tableName,
"0, [true], [2], [3], [4], [5], [6.6], [7.7], [8.8], ['2024-03-22'], ['2024-03-22 12:00:00'], ['11'], ['12'], " +
"[[true], [true]], [[2], [2]], [[3], [3]], [[4], [4]], [[5], [5]], [[6.6], [6.6]], " +
Expand Down Expand Up @@ -128,7 +130,7 @@ public void testArrayType() throws Exception {
assertThat(results).containsExactlyInAnyOrderElementsOf(Collections.singleton(row));
}

private String createComplexTypeTable(String tablePrefix) throws Exception {
private String createArrayTypeTable(String tablePrefix) throws Exception {
String tableName = tablePrefix + "_" + genRandomUuid();
String createStarRocksTable =
String.format(
Expand Down Expand Up @@ -168,4 +170,169 @@ private String createComplexTypeTable(String tablePrefix) throws Exception {
executeSrSQL(createStarRocksTable);
return tableName;
}

@Test
public void testStructType() throws Exception {
String tableName = createStructTypeTable("testStructType");
executeSrSQL(String.format("INSERT INTO `%s`.`%s` VALUES (%s)", DB_NAME, tableName,
"1, row(1, '1'), row('2024-03-27', 8.9)"
)
);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String createSrcSQL = "CREATE TABLE sr_src(" +
"c0 INT," +
"c1 ROW<id INT, name STRING>," +
"c2 ROW<ts DATE, score DECIMAL(38, 10)>" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='" + getJdbcUrl() + "'," +
"'scan-url'='" + String.join(";", getHttpUrls()) + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + tableName + "'," +
"'username' = 'root'," +
"'password' = ''" +
")";
tEnv.executeSql(createSrcSQL);
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.executeSql("SELECT * FROM sr_src").collect());
Row row = Row.of(
1,
Row.of(1, "1"),
Row.of(LocalDate.of(2024, 3, 27), new BigDecimal("8.9000000000"))
);
assertThat(results).containsExactlyInAnyOrderElementsOf(Collections.singleton(row));

}

private String createStructTypeTable(String tablePrefix) throws Exception {
String tableName = tablePrefix + "_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 STRUCT<id INT, name STRING>," +
"c2 STRUCT<ts DATE, score DECIMAL(38, 10)>" +
") ENGINE = OLAP " +
"DUPLICATE KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
DB_NAME, tableName);
executeSrSQL(createStarRocksTable);
return tableName;
}

@Test
public void testMapType() throws Exception {
String tableName = createMapTypeTable("testMapType");
executeSrSQL(String.format("INSERT INTO `%s`.`%s` VALUES (%s)", DB_NAME, tableName,
"1, map{1:'1'}, map{'2024-03-27':8.9}"
)
);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String createSrcSQL = "CREATE TABLE sr_src(" +
"c0 INT," +
"c1 MAP<INT, STRING>," +
"c2 MAP<DATE, DECIMAL(38, 10)>" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='" + getJdbcUrl() + "'," +
"'scan-url'='" + String.join(";", getHttpUrls()) + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + tableName + "'," +
"'username' = 'root'," +
"'password' = ''" +
")";
tEnv.executeSql(createSrcSQL);
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.executeSql("SELECT * FROM sr_src").collect());
Map<Integer, String> c1 = new HashMap<>();
c1.put(1, "1");
Map<LocalDate, BigDecimal> c2 = new HashMap<>();
c2.put(LocalDate.of(2024, 3, 27), new BigDecimal("8.9000000000"));
Row row = Row.of(1, c1, c2);
assertThat(results).containsExactlyInAnyOrderElementsOf(Collections.singleton(row));
}

private String createMapTypeTable(String tablePrefix) throws Exception {
String tableName = tablePrefix + "_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 MAP<INT, STRING>," +
"c2 MAP<DATE, DECIMAL(38, 10)>" +
") ENGINE = OLAP " +
"DUPLICATE KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
DB_NAME, tableName);
executeSrSQL(createStarRocksTable);
return tableName;
}

@Test
public void testNestedType() throws Exception {
String tableName = createNestedTypeTable("testNestedType");
executeSrSQL(String.format("INSERT INTO `%s`.`%s` VALUES (%s)", DB_NAME, tableName,
"1, row(1, [1, 11, 111], map{1:'1'})"
)
);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String createSrcSQL = "CREATE TABLE sr_src(" +
"c0 INT," +
"c1 ROW<a INT, b ARRAY<INT>, c MAP<INT, STRING>>" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='" + getJdbcUrl() + "'," +
"'scan-url'='" + String.join(";", getHttpUrls()) + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + tableName + "'," +
"'username' = 'root'," +
"'password' = ''" +
")";
tEnv.executeSql(createSrcSQL);
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.executeSql("SELECT * FROM sr_src").collect());
Map<Integer, String> c = new HashMap<>();
c.put(1, "1");
Row row = Row.of(1, Row.of(1, new Integer[] {1, 11, 111}, c));
assertThat(results).containsExactlyInAnyOrderElementsOf(Collections.singleton(row));
}

private String createNestedTypeTable(String tablePrefix) throws Exception {
String tableName = tablePrefix + "_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 STRUCT<a INT, b ARRAY<INT>, c MAP<INT, STRING>>" +
") ENGINE = OLAP " +
"DUPLICATE KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
DB_NAME, tableName);
executeSrSQL(createStarRocksTable);
return tableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void checkFlinkRows(StarRocksSourceFlinkRows flinkRows) {
assertTrue(currentObj instanceof DecimalData);
DecimalData cur = (DecimalData)currentObj;
assertTrue(cur.toUnscaledLong() == -3141291000L);
assertTrue(cur.precision() == 10);
assertTrue(cur.precision() == 27);
assertTrue(cur.scale() == 9);
}
}
Expand Down

0 comments on commit 5d9d40f

Please sign in to comment.