Skip to content

Commit

Permalink
111
Browse files Browse the repository at this point in the history
  • Loading branch information
swuferhong committed Dec 11, 2024
1 parent 08e33c1 commit 69a5e48
Showing 1 changed file with 34 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ void testNonPkTableRead() throws Exception {
void testTableProjectPushDown(String mode) throws Exception {
boolean isPkTable = mode.startsWith("PK");
boolean testPkLog = mode.equals("PK_LOG");
String tableName = "table_" + mode;
String tableName = "table_projection_push_down" + mode;
String pkDDL = isPkTable ? ", primary key (a) not enforced" : "";
tEnv.executeSql(
String.format(
Expand Down Expand Up @@ -311,7 +311,9 @@ void testPkTableReadMixSnapshotAndLog() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testReadLogTableWithDifferentScanStartupMode(boolean isPartitioned) throws Exception {
String tableName = "tab1_" + (isPartitioned ? "partitioned" : "non_partitioned");
String tableName =
"tab1_read_with_different_mode_"
+ (isPartitioned ? "partitioned" : "non_partitioned");
String partitionName = null;
TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
if (!isPartitioned) {
Expand Down Expand Up @@ -543,10 +545,10 @@ void testReadPrimaryKeyPartitionedTable() throws Exception {
new DataField("b", com.alibaba.fluss.types.DataTypes.STRING()),
new DataField("c", com.alibaba.fluss.types.DataTypes.STRING()));
tEnv.executeSql(
"create table partitioned_table"
"create table primary_key_partitioned_table_1"
+ " (a int not null, b varchar, c string, primary key (a, c) NOT ENFORCED) partitioned by (c) "
+ "with ('table.auto-partition.enabled' = 'true', 'table.auto-partition.time-unit' = 'year')");
TablePath tablePath = TablePath.of(DEFAULT_DB, "partitioned_table");
TablePath tablePath = TablePath.of(DEFAULT_DB, "primary_key_partitioned_table_1");

// write data into partitions and wait snapshot is done
Map<Long, String> partitionNameById =
Expand All @@ -556,7 +558,7 @@ void testReadPrimaryKeyPartitionedTable() throws Exception {
waitUtilAllBucketFinishSnapshot(admin, tablePath, partitionNameById.values());

org.apache.flink.util.CloseableIterator<Row> rowIter =
tEnv.executeSql("select * from partitioned_table").collect();
tEnv.executeSql("select * from primary_key_partitioned_table_1").collect();
assertResultsIgnoreOrder(rowIter, expectedRowValues, false);

// then create some new partitions, and write rows to the new partitions
Expand Down Expand Up @@ -584,7 +586,9 @@ private static Stream<Arguments> lookupArgs() {
@ParameterizedTest
@MethodSource("lookupArgs")
void testLookup1PkTable(Caching caching, boolean async) throws Exception {
String dim = prepareDimTableAndSourceTable(caching, async, new String[] {"id"}, null);
String dim =
prepareDimTableAndSourceTable(
caching, async, new String[] {"id"}, null, "t_lookup1");
String dimJoinQuery =
String.format(
"SELECT a, c, h.name FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h"
Expand All @@ -603,7 +607,9 @@ void testLookup1PkTable(Caching caching, boolean async) throws Exception {
@ParameterizedTest
@MethodSource("lookupArgs")
void testLookup1PkTableWith2Conditions(Caching caching, boolean async) throws Exception {
String dim = prepareDimTableAndSourceTable(caching, async, new String[] {"id"}, null);
String dim =
prepareDimTableAndSourceTable(
caching, async, new String[] {"id"}, null, "t_lookup2");
String dimJoinQuery =
String.format(
"SELECT a, b, h.name FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h"
Expand All @@ -622,7 +628,9 @@ void testLookup1PkTableWith2Conditions(Caching caching, boolean async) throws Ex
@ParameterizedTest
@MethodSource("lookupArgs")
void testLookup1PkTableWith3Conditions(Caching caching, boolean async) throws Exception {
String dim = prepareDimTableAndSourceTable(caching, async, new String[] {"id"}, null);
String dim =
prepareDimTableAndSourceTable(
caching, async, new String[] {"id"}, null, "t_lookup3");
String dimJoinQuery =
String.format(
"SELECT a, b, c, h.address FROM src LEFT JOIN %s FOR SYSTEM_TIME AS OF src.proc as h"
Expand All @@ -644,7 +652,8 @@ void testLookup1PkTableWith3Conditions(Caching caching, boolean async) throws Ex
@MethodSource("lookupArgs")
void testLookup2PkTable(Caching caching, boolean async) throws Exception {
String dim =
prepareDimTableAndSourceTable(caching, async, new String[] {"id", "name"}, null);
prepareDimTableAndSourceTable(
caching, async, new String[] {"id", "name"}, null, "t_lookup4");
String dimJoinQuery =
String.format(
"SELECT a, b, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h"
Expand All @@ -665,7 +674,8 @@ void testLookup2PkTable(Caching caching, boolean async) throws Exception {
void testLookup2PkTableWithUnorderedKey(Caching caching, boolean async) throws Exception {
// the primary key is (name, id) but the schema order is (id, name)
String dim =
prepareDimTableAndSourceTable(caching, async, new String[] {"name", "id"}, null);
prepareDimTableAndSourceTable(
caching, async, new String[] {"name", "id"}, null, "t_lookup5");
String dimJoinQuery =
String.format(
"SELECT a, b, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h"
Expand All @@ -685,7 +695,8 @@ void testLookup2PkTableWithUnorderedKey(Caching caching, boolean async) throws E
@MethodSource("lookupArgs")
void testLookup2PkTableWith1KeyInCondition(Caching caching, boolean async) throws Exception {
String dim =
prepareDimTableAndSourceTable(caching, async, new String[] {"id", "name"}, null);
prepareDimTableAndSourceTable(
caching, async, new String[] {"id", "name"}, null, "t_lookup6");
String dimJoinQuery =
String.format(
"SELECT a, b, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h"
Expand All @@ -706,7 +717,8 @@ void testLookup2PkTableWith1KeyInCondition(Caching caching, boolean async) throw
@MethodSource("lookupArgs")
void testLookup2PkTableWith3Conditions(Caching caching, boolean async) throws Exception {
String dim =
prepareDimTableAndSourceTable(caching, async, new String[] {"id", "name"}, null);
prepareDimTableAndSourceTable(
caching, async, new String[] {"id", "name"}, null, "t_lookup7");
String dimJoinQuery =
String.format(
"SELECT a, h.name, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h"
Expand All @@ -721,7 +733,9 @@ void testLookup2PkTableWith3Conditions(Caching caching, boolean async) throws Ex
@ParameterizedTest
@MethodSource("lookupArgs")
void testLookupPartitionedTable(Caching caching, boolean async) throws Exception {
String dim = prepareDimTableAndSourceTable(caching, async, new String[] {"id"}, "p_date");
String dim =
prepareDimTableAndSourceTable(
caching, async, new String[] {"id"}, "p_date", "t_lookup8");

String dimJoinQuery =
String.format(
Expand Down Expand Up @@ -766,7 +780,11 @@ private InternalRow genRow(boolean isPkTable, RowType rowType, Object[] objects)
* @return the table name of the dim table
*/
private String prepareDimTableAndSourceTable(
Caching caching, boolean async, String[] keys, @Nullable String partitionedKey)
Caching caching,
boolean async,
String[] keys,
@Nullable String partitionedKey,
String tablePrefix)
throws Exception {
String options = async ? "'lookup.async' = 'true'" : "'lookup.async' = 'false'";
if (caching == Caching.ENABLE_CACHE) {
Expand All @@ -779,7 +797,8 @@ private String prepareDimTableAndSourceTable(
// create dim table
String tableName =
String.format(
"lookup_test_%s_%s_pk_%s_%s",
"%s_lookup_test_%s_%s_pk_%s_%s",
tablePrefix,
caching.name().toLowerCase(),
async ? "async" : "sync",
String.join("_", keys),
Expand Down

0 comments on commit 69a5e48

Please sign in to comment.