From ec5ebeb27bd4e9f8a557cf2137eeee4f370ad8b2 Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Wed, 6 Mar 2024 17:36:50 +0800 Subject: [PATCH] [ci] Fix paimon flink tests to cover submodule versions (#2949) --- .github/workflows/utitcase-flink.yml | 7 ++++- .../flink/ContinuousFileStoreITCase.java | 29 ++++++++++++++--- .../flink/ContinuousFileStoreITCase.java | 29 ++++++++++++++--- .../paimon/flink/SchemaChangeITCase.java | 5 +-- paimon-flink/paimon-flink-common/pom.xml | 29 ++--------------- paimon-flink/pom.xml | 31 +++++++++++++++++++ 6 files changed, 90 insertions(+), 40 deletions(-) diff --git a/.github/workflows/utitcase-flink.yml b/.github/workflows/utitcase-flink.yml index 3b543b3a2f5d..a5c83366f9a8 100644 --- a/.github/workflows/utitcase-flink.yml +++ b/.github/workflows/utitcase-flink.yml @@ -51,6 +51,11 @@ jobs: . .github/workflows/utils.sh jvm_timezone=$(random_timezone) echo "JVM timezone is set to $jvm_timezone" - mvn -T 1C -B clean install -pl 'org.apache.paimon:paimon-flink-common' -Duser.timezone=$jvm_timezone + test_modules="" + for suffix in 1.14 1.15 1.16 1.17 1.18 common; do + test_modules+="org.apache.paimon:paimon-flink-${suffix}," + done + test_modules="${test_modules%,}" + mvn -T 1C -B clean install -pl "${test_modules}" -Duser.timezone=$jvm_timezone env: MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java index 21211e65ea60..0e14aaeebe39 100644 --- a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java +++ b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java @@ -42,8 +42,9 @@ public class ContinuousFileStoreITCase extends CatalogITCaseBase { @Override protected List ddl() { return Arrays.asList( - "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)", - "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED) WITH('changelog-producer'='input')"); + "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING) WITH ('bucket' = '1')", + "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)" + + " WITH ('changelog-producer'='input', 'bucket' = '1')"); } @Test @@ -202,12 +203,12 @@ public void testConfigureStartupTimestamp() throws Exception { @Test public void testConfigureStartupSnapshot() throws Exception { // Configure 'scan.snapshot-id' without 'scan.mode'. + batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')"); + batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')"); BlockingIterator iterator = BlockingIterator.of( streamSqlIter( "SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1)); - batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')"); - batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')"); assertThat(iterator.collect(2)) .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6")); iterator.close(); @@ -221,13 +222,31 @@ public void testConfigureStartupSnapshot() throws Exception { .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6")); iterator.close(); + iterator = + BlockingIterator.of( + streamSqlIter( + "SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1)); + assertThat(iterator.collect(2)) + .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6")); + iterator.close(); + + iterator = + BlockingIterator.of( + streamSqlIter( + "SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 2)); + assertThat(iterator.collect(2)) + .containsExactlyInAnyOrder(Row.of("7", "8", "9"), (Row.of("10", "11", "12"))); + iterator.close(); + // Configure 'scan.snapshot-id' with 'scan.mode=latest'. assertThatThrownBy( () -> streamSqlIter( "SELECT * FROM T1 /*+ OPTIONS('scan.mode'='latest', 'scan.snapshot-id'='%s') */", 0)) - .hasMessageContaining("Unable to create a source for reading table"); + .hasCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage( + "scan.snapshot-id must be null when you use latest for scan.mode"); } @Test diff --git a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java index 21211e65ea60..0e14aaeebe39 100644 --- a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java +++ b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java @@ -42,8 +42,9 @@ public class ContinuousFileStoreITCase extends CatalogITCaseBase { @Override protected List ddl() { return Arrays.asList( - "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)", - "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED) WITH('changelog-producer'='input')"); + "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING) WITH ('bucket' = '1')", + "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)" + + " WITH ('changelog-producer'='input', 'bucket' = '1')"); } @Test @@ -202,12 +203,12 @@ public void testConfigureStartupTimestamp() throws Exception { @Test public void testConfigureStartupSnapshot() throws Exception { // Configure 'scan.snapshot-id' without 'scan.mode'. + batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')"); + batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')"); BlockingIterator iterator = BlockingIterator.of( streamSqlIter( "SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1)); - batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')"); - batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')"); assertThat(iterator.collect(2)) .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6")); iterator.close(); @@ -221,13 +222,31 @@ public void testConfigureStartupSnapshot() throws Exception { .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6")); iterator.close(); + iterator = + BlockingIterator.of( + streamSqlIter( + "SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1)); + assertThat(iterator.collect(2)) + .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6")); + iterator.close(); + + iterator = + BlockingIterator.of( + streamSqlIter( + "SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 2)); + assertThat(iterator.collect(2)) + .containsExactlyInAnyOrder(Row.of("7", "8", "9"), (Row.of("10", "11", "12"))); + iterator.close(); + // Configure 'scan.snapshot-id' with 'scan.mode=latest'. assertThatThrownBy( () -> streamSqlIter( "SELECT * FROM T1 /*+ OPTIONS('scan.mode'='latest', 'scan.snapshot-id'='%s') */", 0)) - .hasMessageContaining("Unable to create a source for reading table"); + .hasCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage( + "scan.snapshot-id must be null when you use latest for scan.mode"); } @Test diff --git a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index d0553c8c7f63..663319ec989a 100644 --- a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -44,7 +44,7 @@ public void testSetAndRemoveOption() throws Exception { } @Test - public void testSetAndResetImmutableOptions() throws Exception { + public void testSetAndResetImmutableOptions() { // bucket-key is immutable sql("CREATE TABLE T1 (a STRING, b STRING, c STRING)"); @@ -53,7 +53,8 @@ public void testSetAndResetImmutableOptions() throws Exception { .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Change 'bucket-key' is not supported yet."); - sql("CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket-key' = 'c')"); + sql( + "CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket' = '1', 'bucket-key' = 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('bucket-key')")) .getRootCause() .isInstanceOf(UnsupportedOperationException.class) diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index 76b2dc4ff9fd..74c53a85b298 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -113,34 +113,9 @@ under the License. org.apache.avro avro - - - - - org.apache.hadoop - hadoop-common - ${hadoop.version} - provided - - org.apache.avro - avro - - - log4j - log4j - - - org.slf4j - slf4j-log4j12 - - - jdk.tools - jdk.tools - - - com.google.protobuf - protobuf-java + org.apache.hadoop + hadoop-common diff --git a/paimon-flink/pom.xml b/paimon-flink/pom.xml index b88f541c6ed2..df6ee6acba77 100644 --- a/paimon-flink/pom.xml +++ b/paimon-flink/pom.xml @@ -63,6 +63,37 @@ under the License. ${project.version} + + org.apache.hadoop + hadoop-common + ${hadoop.version} + provided + + + org.apache.avro + avro + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + jdk.tools + jdk.tools + + + com.google.protobuf + protobuf-java + + + + + + org.apache.paimon paimon-core