Skip to content

Commit

Permalink
[Test] Add tests for reading and write StarRocks DATETIME (#387)
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy authored Sep 19, 2024
1 parent aa35494 commit 353ab2d
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

import com.starrocks.connector.flink.tools.DataUtil;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DecimalVector;
Expand Down Expand Up @@ -270,8 +269,8 @@ public Object convert(FieldVector vector, int rowIndex) {
// Convert from arrow varchar to flink timestamp-related type
class TimestampConverter implements ArrowFieldConverter {

private static final String DATETIME_FORMAT_LONG = "yyyy-MM-dd HH:mm:ss.SSSSSS";
private static final String DATETIME_FORMAT_SHORT = "yyyy-MM-dd HH:mm:ss";
private static final DateTimeFormatter DATETIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss[.SSSSSS]");

private final boolean isNullable;

Expand All @@ -287,15 +286,7 @@ public Object convert(FieldVector vector, int rowIndex) {
if (value == null) {
return null;
}
if (value.length() < DATETIME_FORMAT_SHORT.length()) {
throw new IllegalArgumentException("Date value length shorter than DATETIME_FORMAT_SHORT");
}
if (value.length() == DATETIME_FORMAT_SHORT.length()) {
value = DataUtil.addZeroForNum(value + ".", DATETIME_FORMAT_LONG.length());
}
value = DataUtil.addZeroForNum(value, DATETIME_FORMAT_LONG.length());
DateTimeFormatter df = DateTimeFormatter.ofPattern(DATETIME_FORMAT_LONG);
LocalDateTime ldt = LocalDateTime.parse(value, df);
LocalDateTime ldt = LocalDateTime.parse(value, DATETIME_FORMATTER);
return TimestampData.fromLocalDateTime(ldt);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -739,4 +740,70 @@ public void testJsonLz4Compression() throws Exception {
map.put("sink.at-least-once.use-transaction-stream-load", "false");
testConfigurationBase(map, env -> null);
}

@Test
public void testTimestampType() throws Exception {
String tableName = createDatetimeTable("testTimestampType");
StarRocksSinkOptions sinkOptions = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", getJdbcUrl())
.withProperty("load-url", getHttpUrls())
.withProperty("database-name", DB_NAME)
.withProperty("table-name", tableName)
.withProperty("username", "root")
.withProperty("password", "")
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv;
tEnv = StreamTableEnvironment.create(env);
String createSQL = "CREATE TABLE sink(" +
"c0 INT," +
"c1 TIMESTAMP" +
", PRIMARY KEY (`c0`) NOT ENFORCED" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," +
"'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," +
"'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," +
"'sink.use.new-sink-api' = '" + (newSinkApi ? "true" : "false") + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + sinkOptions.getTableName() + "'," +
"'username' = '" + sinkOptions.getUsername() + "'," +
"'password' = '" + sinkOptions.getPassword() + "'" +
")";
tEnv.executeSql(createSQL);
tEnv.executeSql("INSERT INTO sink VALUES " +
"(0, CAST('2024-09-19 14:00:00' AS TIMESTAMP(9)))," +
"(1, CAST('2024-09-19 14:01:00.123' AS TIMESTAMP(9)))," +
"(2, CAST('2024-09-19 14:02:00.123456' AS TIMESTAMP(9)))," +
"(3, CAST('2024-09-19 14:03:00.123456789' AS TIMESTAMP(9)))"
).await();
List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(0, Timestamp.valueOf("2024-09-19 14:00:00")),
Arrays.asList(1, Timestamp.valueOf("2024-09-19 14:01:00.123")),
Arrays.asList(2, Timestamp.valueOf("2024-09-19 14:02:00.123456")),
Arrays.asList(3, Timestamp.valueOf("2024-09-19 14:03:00.123456"))
);
List<List<Object>> actualData = scanTable(DB_CONNECTION, DB_NAME, tableName);
verifyResult(expectedData, actualData);
}

private String createDatetimeTable(String tablePrefix) throws Exception {
String tableName = tablePrefix + "_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 DATETIME" +
") ENGINE = OLAP " +
"PRIMARY KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
DB_NAME, tableName);
executeSrSQL(createStarRocksTable);
return tableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -463,4 +463,58 @@ private String createDimTable(String tablePrefix) throws Exception {
return tableName;
}

@Test
public void testTimestampType() throws Exception {
String tableName = createDatetimeTable("testNestedType");
executeSrSQL("INSERT INTO " + DB_NAME + "." + tableName + " VALUES " +
"(0, '2024-09-19 14:00:00')," +
"(1, '2024-09-19 14:01:00.123')," +
"(2, '2024-09-19 14:02:00.123456')"
);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String createSrcSQL = "CREATE TABLE sr_src(" +
"c0 INT," +
"c1 TIMESTAMP" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='" + getJdbcUrl() + "'," +
"'scan-url'='" + String.join(";", getHttpUrls()) + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + tableName + "'," +
"'username' = 'root'," +
"'password' = ''" +
")";
tEnv.executeSql(createSrcSQL);
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.executeSql("SELECT * FROM sr_src").collect());
List<Row> expected = Arrays.asList(
Row.of(0, LocalDateTime.of(2024, 9, 19, 14, 0, 0)),
Row.of(1, LocalDateTime.of(2024, 9, 19, 14, 1, 0, 123000000)),
Row.of(2, LocalDateTime.of(2024, 9, 19, 14, 2, 0, 123456000))
);
assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
}

private String createDatetimeTable(String tablePrefix) throws Exception {
String tableName = tablePrefix + "_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 DATETIME" +
") ENGINE = OLAP " +
"PRIMARY KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
DB_NAME, tableName);
executeSrSQL(createStarRocksTable);
return tableName;
}
}

0 comments on commit 353ab2d

Please sign in to comment.