diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java index 8f0571da..1064952e 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java @@ -141,8 +141,11 @@ public void validateTableStructure(StarRocksSinkOptions sinkOptions, TableSchema } String srType = srColumn.get("DATA_TYPE").toString().toLowerCase(); - boolean typeMatched = typesMap.containsKey(srType) && - typesMap.get(srType).contains(column.getType().getLogicalType().getTypeRoot()); + // Some types of StarRocks, such as json, are not mapped to Flink natively, + // and there will be no entry in typesMap, but they can be represented as + // STRING in Flink generally, so we think the type is matched even if + // typesMap does not contain the srType + boolean typeMatched = !typesMap.containsKey(srType) || typesMap.get(srType).contains(column.getType().getLogicalType().getTypeRoot()); if (!typeMatched) { throw new IllegalArgumentException( String.format("Flink and StarRocks types are not matched for column %s, " + diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java index 517b7c9a..36a5780a 100644 --- a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java @@ -545,4 +545,57 @@ private String createPkTable(String tablePrefix) throws Exception { executeSrSQL(createStarRocksTable); return tableName; } + + @Test + public void testUnalignedTypes() throws Exception { + String tableName = "testUnalignedTypes_" + genRandomUuid(); + String createStarRocksTable = + String.format( + "CREATE TABLE `%s`.`%s` (" + + "c0 INT," + + "c1 LARGEINT," + + "c2 JSON" + + ") ENGINE = OLAP " + + "PRIMARY KEY(c0) " + + "DISTRIBUTED BY HASH (c0) BUCKETS 8 " + + "PROPERTIES (" + + "\"replication_num\" = \"1\"" + + ")", + DB_NAME, tableName); + executeSrSQL(createStarRocksTable); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + StarRocksSinkOptions sinkOptions = StarRocksSinkOptions.builder() + .withProperty("jdbc-url", getJdbcUrl()) + .withProperty("load-url", getHttpUrls()) + .withProperty("database-name", DB_NAME) + .withProperty("table-name", tableName) + .withProperty("username", "root") + .withProperty("password", "") + .build(); + + String createSQL = "CREATE TABLE sink(" + + "c0 INT," + + "c1 STRING," + + "c2 STRING," + + "PRIMARY KEY (`c0`) NOT ENFORCED" + + ") WITH ( " + + "'connector' = 'starrocks'," + + "'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," + + "'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," + + "'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," + + "'database-name' = '" + DB_NAME + "'," + + "'table-name' = '" + sinkOptions.getTableName() + "'," + + "'username' = '" + sinkOptions.getUsername() + "'," + + "'password' = '" + sinkOptions.getPassword() + "'" + + ")"; + + tEnv.executeSql(createSQL); + tEnv.executeSql("INSERT INTO sink VALUES (1, '123', '{\"key\": 1, \"value\": 2}')").await(); + List> actualData = scanTable(DB_CONNECTION, DB_NAME, tableName); + verifyResult(Collections.singletonList(Arrays.asList(1, "123", "{\"key\": 1, \"value\": 2}")), actualData); + } }