From 3ca6fc0494428e490e05d196df96f05247b35d57 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Wed, 10 Apr 2024 14:45:58 +0800 Subject: [PATCH] [flink] Set default recover strategy for TableEnvironment and StreamExecutionEnvironment in tests (#3133) --- .../flink/action/cdc/CdcActionITCaseBase.java | 11 +- .../flink/kafka/KafkaTableTestBase.java | 4 +- .../cdc/FlinkCdcSyncDatabaseSinkITCase.java | 13 +- .../sink/cdc/FlinkCdcSyncTableSinkITCase.java | 13 +- .../lookup/AsyncLookupFunctionWrapper.java | 5 + .../paimon/flink/CatalogITCaseBase.java | 9 +- .../apache/paimon/flink/FileStoreITCase.java | 20 +- .../paimon/flink/FileSystemCatalogITCase.java | 20 +- .../apache/paimon/flink/FlinkTestBase.java | 16 +- .../apache/paimon/flink/LookupJoinITCase.java | 2 +- .../paimon/flink/MappingTableITCase.java | 3 +- .../flink/PrimaryKeyFileStoreTableITCase.java | 129 ++++++----- .../paimon/flink/action/ActionITCaseBase.java | 42 ++-- .../flink/action/CompactActionITCase.java | 7 +- .../action/CompactDatabaseActionITCase.java | 18 +- .../flink/sink/CompactorSinkITCase.java | 7 +- .../flink/sink/SinkSavepointITCase.java | 55 ++--- .../flink/source/CompactorSourceITCase.java | 9 +- ...ltiTablesCompactorSourceBuilderITCase.java | 15 +- .../paimon/flink/util/AbstractTestBase.java | 214 +++++++++++++++++- .../MigrateDatabaseProcedureITCase.java | 18 +- .../procedure/MigrateFileProcedureITCase.java | 8 +- .../MigrateTableProcedureITCase.java | 18 +- 23 files changed, 401 insertions(+), 255 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java index 7a621030217a..63b42e627611 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java @@ -36,7 +36,6 @@ import org.apache.paimon.types.RowType; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.jupiter.api.AfterEach; @@ -66,10 +65,12 @@ public class CdcActionITCaseBase extends ActionITCaseBase { @BeforeEach public void setEnv() { - env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(2); - env.enableCheckpointing(1000); - env.setRestartStrategy(RestartStrategies.noRestart()); + env = + streamExecutionEnvironmentBuilder() + .streamingMode() + .parallelism(2) + .checkpointIntervalMs(1000) + .build(); } @AfterEach diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java index fdcb99f33b94..e0724d13a693 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java @@ -20,7 +20,6 @@ import org.apache.paimon.flink.util.AbstractTestBase; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -103,9 +102,8 @@ protected void doStart() { @BeforeEach public void setup() { - env = StreamExecutionEnvironment.getExecutionEnvironment(); + env = streamExecutionEnvironmentBuilder().streamingMode().build(); tEnv = StreamTableEnvironment.create(env); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); tEnv.getConfig() .getConfiguration() .set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java index d3369cef3344..a7c6b2cb6323 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java @@ -42,7 +42,6 @@ import org.apache.paimon.utils.FailingFileIO; import org.apache.paimon.utils.TraceableFileIO; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.jupiter.api.Test; @@ -148,12 +147,12 @@ private void innerTestRandomCdcEvents(Supplier bucket, boolean unawareB } List events = mergeTestTableEvents(testTables); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getCheckpointConfig().setCheckpointInterval(100); - if (!enableFailure) { - env.setRestartStrategy(RestartStrategies.noRestart()); - } + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(100) + .allowRestart(enableFailure) + .build(); TestCdcSourceFunction sourceFunction = new TestCdcSourceFunction(events); DataStreamSource source = env.addSource(sourceFunction); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java index 57a7604c00bb..081bd7d073d7 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java @@ -43,7 +43,6 @@ import org.apache.paimon.utils.FailingFileIO; import org.apache.paimon.utils.TraceableFileIO; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.jupiter.api.Disabled; @@ -145,12 +144,12 @@ private void innerTestRandomCdcEvents( Collections.singletonList("pt"), primaryKeys, numBucket); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getCheckpointConfig().setCheckpointInterval(100); - if (!enableFailure) { - env.setRestartStrategy(RestartStrategies.noRestart()); - } + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(100) + .allowRestart(enableFailure) + .build(); TestCdcSourceFunction sourceFunction = new TestCdcSourceFunction(testTable.events()); DataStreamSource source = env.addSource(sourceFunction); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java index f8b41d1414f7..99f3d4643cbb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncLookupFunctionWrapper.java @@ -50,12 +50,17 @@ public void open(FunctionContext context) throws Exception { } private Collection lookup(RowData keyRow) { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread() + .setContextClassLoader(AsyncLookupFunctionWrapper.class.getClassLoader()); try { synchronized (function) { return function.lookup(keyRow); } } catch (IOException e) { throw new UncheckedIOException(e); + } finally { + Thread.currentThread().setContextClassLoader(cl); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java index 31b3ffdffcb7..b5ff3b30448d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java @@ -29,7 +29,6 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; @@ -51,7 +50,6 @@ import java.io.File; import java.io.IOException; -import java.time.Duration; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -60,8 +58,6 @@ import java.util.function.Consumer; import java.util.stream.Collectors; -import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL; - /** ITCase for catalog. */ public abstract class CatalogITCaseBase extends AbstractTestBase { @@ -71,7 +67,7 @@ public abstract class CatalogITCaseBase extends AbstractTestBase { @BeforeEach public void before() throws IOException { - tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build()); + tEnv = tableEnvironmentBuilder().batchMode().build(); String catalog = "PAIMON"; path = getTempDirPath(); String inferScan = @@ -89,8 +85,7 @@ public void before() throws IOException { .collect(Collectors.joining(",")))); tEnv.useCatalog(catalog); - sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build()); - sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100)); + sEnv = tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).build(); sEnv.registerCatalog(catalog, tEnv.getCatalog(catalog).get()); sEnv.useCatalog(catalog); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java index b7c739e96e68..6b68532a25b9 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java @@ -35,7 +35,6 @@ import org.apache.paimon.utils.BlockingIterator; import org.apache.paimon.utils.FailingFileIO; -import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.datastream.DataStream; @@ -396,19 +395,16 @@ private static RowData srcRow(RowKind kind, int v, String p, int k) { return wrap(GenericRowData.ofKind(kind, v, StringData.fromString(p), k)); } - public static StreamExecutionEnvironment buildStreamEnv() { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRuntimeMode(RuntimeExecutionMode.STREAMING); - env.enableCheckpointing(100); - env.setParallelism(2); - return env; + public StreamExecutionEnvironment buildStreamEnv() { + return streamExecutionEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(100) + .parallelism(2) + .build(); } - public static StreamExecutionEnvironment buildBatchEnv() { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRuntimeMode(RuntimeExecutionMode.BATCH); - env.setParallelism(2); - return env; + public StreamExecutionEnvironment buildBatchEnv() { + return streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build(); } public static FileStoreTable buildFileStoreTable( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java index 50d28cd112fc..451e4c78bcec 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java @@ -28,10 +28,8 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.utils.BlockingIterator; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.BeforeEach; @@ -55,18 +53,16 @@ public class FileSystemCatalogITCase extends AbstractTestBase { private static final String DB_NAME = "default"; private String path; - private StreamTableEnvironment tEnv; + private TableEnvironment tEnv; @BeforeEach public void setup() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env.setParallelism(1); - - tEnv = StreamTableEnvironment.create(env); - tEnv.getConfig() - .getConfiguration() - .set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false); + tEnv = + tableEnvironmentBuilder() + .streamingMode() + .parallelism(1) + .setConf(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false) + .build(); path = getTempDirPath(); tEnv.executeSql( String.format("CREATE CATALOG fs WITH ('type'='paimon', 'warehouse'='%s')", path)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java index c6cc77bfb5e7..b7da6cd95802 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java @@ -22,11 +22,8 @@ import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Schema; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -59,7 +56,7 @@ public abstract class FlinkTestBase extends AbstractTestBase { protected ExpectedResult expectedResult; protected boolean ignoreException; - protected StreamTableEnvironment tEnv; + protected TableEnvironment tEnv; protected String rootPath; protected ResolvedCatalogTable resolvedTable = @@ -79,14 +76,11 @@ protected void prepareEnv( this.ignoreException = ignoreException; this.expectedResult = expectedResult; - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance().inBatchMode(); if (executionMode == RuntimeExecutionMode.STREAMING) { - env.enableCheckpointing(100); - builder.inStreamingMode(); + tEnv = tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).build(); + } else { + tEnv = tableEnvironmentBuilder().batchMode().build(); } - tEnv = StreamTableEnvironment.create(env, builder.build()); rootPath = getTempDirPath(); tEnv.executeSql( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index a11e7887ce98..26982e44aa7f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -596,7 +596,7 @@ public void testAsyncRetryLookup(LookupCacheMode cacheMode) throws Exception { String query = "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss'," - + " 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='60') */" + + " 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='30') */" + " T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS('lookup.async'='true') */ for system_time as of T.proctime AS D ON T.i = D.i"; BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MappingTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MappingTableITCase.java index 0d3c5f0522eb..fb011bb11115 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MappingTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MappingTableITCase.java @@ -22,7 +22,6 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.ValidationException; import org.apache.flink.types.Row; @@ -44,7 +43,7 @@ public class MappingTableITCase extends AbstractTestBase { @BeforeEach public void before() throws IOException { - tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build()); + tEnv = tableEnvironmentBuilder().batchMode().build(); path = getTempDirPath(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index 8872cc333962..e495ad3da5d4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -26,11 +26,8 @@ import org.apache.paimon.utils.FailingFileIO; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.core.execution.JobClient; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.config.ExecutionConfigOptions; @@ -42,7 +39,6 @@ import org.junit.jupiter.api.Timeout; import java.io.IOException; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -53,7 +49,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; -import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL; import static org.assertj.core.api.Assertions.assertThat; /** Tests for changelog table with primary keys. */ @@ -76,32 +71,6 @@ public void before() throws IOException { } } - private TableEnvironment createBatchTableEnvironment() { - return TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build()); - } - - private TableEnvironment createStreamingTableEnvironment(int checkpointIntervalMs) { - TableEnvironment sEnv = - TableEnvironment.create( - EnvironmentSettings.newInstance().inStreamingMode().build()); - // set checkpoint interval to a random number to emulate different speed of commit - sEnv.getConfig() - .getConfiguration() - .set(CHECKPOINTING_INTERVAL, Duration.ofMillis(checkpointIntervalMs)); - sEnv.getConfig() - .set( - ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, - ExecutionConfigOptions.UpsertMaterialize.NONE); - return sEnv; - } - - private StreamExecutionEnvironment createStreamExecutionEnvironment(int checkpointIntervalMs) { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); - env.getCheckpointConfig().setCheckpointInterval(checkpointIntervalMs); - return env; - } - private String createCatalogSql(String catalogName, String warehouse) { String defaultPropertyString = ""; if (tableDefaultProperties.size() > 0) { @@ -138,10 +107,7 @@ public void testFullCompactionTriggerInterval() throws Exception { @Timeout(1200) public void testFullCompactionWithLongCheckpointInterval() throws Exception { // create table - TableEnvironment bEnv = createBatchTableEnvironment(); - bEnv.getConfig() - .getConfiguration() - .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().parallelism(1).build(); bEnv.executeSql(createCatalogSql("testCatalog", path)); bEnv.executeSql("USE CATALOG testCatalog"); bEnv.executeSql( @@ -156,18 +122,23 @@ public void testFullCompactionWithLongCheckpointInterval() throws Exception { + ")"); // run select job - TableEnvironment sEnv = createStreamingTableEnvironment(100); - sEnv.getConfig() - .getConfiguration() - .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(100) + .parallelism(1) + .build(); sEnv.executeSql(createCatalogSql("testCatalog", path)); sEnv.executeSql("USE CATALOG testCatalog"); CloseableIterator it = sEnv.executeSql("SELECT * FROM T").collect(); // run compact job - StreamExecutionEnvironment env = createStreamExecutionEnvironment(2000); + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(2000) + .build(); env.setParallelism(1); - env.setRestartStrategy(RestartStrategies.noRestart()); new CompactAction(path, "default", "T").withStreamExecutionEnvironment(env).build(); JobClient client = env.executeAsync(); @@ -199,10 +170,11 @@ public void testLookupChangelog() throws Exception { private void innerTestChangelogProducing(List options) throws Exception { TableEnvironment sEnv = - createStreamingTableEnvironment(ThreadLocalRandom.current().nextInt(900) + 100); - sEnv.getConfig() - .getConfiguration() - .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + tableEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100) + .parallelism(1) + .build(); sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); sEnv.executeSql("USE CATALOG testCatalog"); @@ -273,7 +245,7 @@ private void innerTestChangelogProducing(List options) throws Exception @Test @Timeout(1200) public void testNoChangelogProducerBatchRandom() throws Exception { - TableEnvironment bEnv = createBatchTableEnvironment(); + TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build(); testNoChangelogProducerRandom(bEnv, 1, false); } @@ -281,14 +253,19 @@ public void testNoChangelogProducerBatchRandom() throws Exception { @Timeout(1200) public void testNoChangelogProducerStreamingRandom() throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); - TableEnvironment sEnv = createStreamingTableEnvironment(random.nextInt(900) + 100); + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(random.nextInt(900) + 100) + .allowRestart() + .build(); testNoChangelogProducerRandom(sEnv, random.nextInt(1, 3), random.nextBoolean()); } @Test @Timeout(1200) public void testFullCompactionChangelogProducerBatchRandom() throws Exception { - TableEnvironment bEnv = createBatchTableEnvironment(); + TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build(); testFullCompactionChangelogProducerRandom(bEnv, 1, false); } @@ -296,7 +273,12 @@ public void testFullCompactionChangelogProducerBatchRandom() throws Exception { @Timeout(1200) public void testFullCompactionChangelogProducerStreamingRandom() throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); - TableEnvironment sEnv = createStreamingTableEnvironment(random.nextInt(900) + 100); + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(random.nextInt(900) + 100) + .allowRestart() + .build(); testFullCompactionChangelogProducerRandom(sEnv, random.nextInt(1, 3), random.nextBoolean()); } @@ -304,14 +286,19 @@ public void testFullCompactionChangelogProducerStreamingRandom() throws Exceptio @Timeout(1200) public void testStandAloneFullCompactJobRandom() throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); - TableEnvironment sEnv = createStreamingTableEnvironment(random.nextInt(900) + 100); + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(random.nextInt(900) + 100) + .allowRestart() + .build(); testStandAloneFullCompactJobRandom(sEnv, random.nextInt(1, 3), random.nextBoolean()); } @Test @Timeout(1200) public void testLookupChangelogProducerBatchRandom() throws Exception { - TableEnvironment bEnv = createBatchTableEnvironment(); + TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build(); testLookupChangelogProducerRandom(bEnv, 1, false); } @@ -319,7 +306,12 @@ public void testLookupChangelogProducerBatchRandom() throws Exception { @Timeout(1200) public void testLookupChangelogProducerStreamingRandom() throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); - TableEnvironment sEnv = createStreamingTableEnvironment(random.nextInt(900) + 100); + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(random.nextInt(900) + 100) + .allowRestart() + .build(); testLookupChangelogProducerRandom(sEnv, random.nextInt(1, 3), random.nextBoolean()); } @@ -327,7 +319,12 @@ public void testLookupChangelogProducerStreamingRandom() throws Exception { @Timeout(1200) public void testStandAloneLookupJobRandom() throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); - TableEnvironment sEnv = createStreamingTableEnvironment(random.nextInt(900) + 100); + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(random.nextInt(900) + 100) + .allowRestart() + .build(); testStandAloneLookupJobRandom(sEnv, random.nextInt(1, 3), random.nextBoolean()); } @@ -433,8 +430,12 @@ private void testStandAloneFullCompactJobRandom( for (int i = enableConflicts ? 2 : 1; i > 0; i--) { StreamExecutionEnvironment env = - createStreamExecutionEnvironment(random.nextInt(1900) + 100); - env.setParallelism(2); + streamExecutionEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(random.nextInt(1900) + 100) + .parallelism(2) + .allowRestart() + .build(); new CompactAction(path, "default", "T").withStreamExecutionEnvironment(env).build(); env.executeAsync(); } @@ -469,7 +470,11 @@ private void testStandAloneLookupJobRandom( for (int i = enableConflicts ? 2 : 1; i > 0; i--) { StreamExecutionEnvironment env = - createStreamExecutionEnvironment(random.nextInt(1900) + 100); + streamExecutionEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(random.nextInt(1900) + 100) + .allowRestart() + .build(); env.setParallelism(2); new CompactAction(path, "default", "T").withStreamExecutionEnvironment(env).build(); env.executeAsync(); @@ -483,10 +488,12 @@ private void testStandAloneLookupJobRandom( } private void checkChangelogTestResult(int numProducers) throws Exception { - TableEnvironment sEnv = createStreamingTableEnvironment(100); - sEnv.getConfig() - .getConfiguration() - .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(100) + .parallelism(1) + .build(); sEnv.executeSql(createCatalogSql("testCatalog", path)); sEnv.executeSql("USE CATALOG testCatalog"); @@ -601,7 +608,7 @@ private List testRandom( } private void checkBatchResult(int numProducers) throws Exception { - TableEnvironment bEnv = createBatchTableEnvironment(); + TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build(); bEnv.executeSql(createCatalogSql("testCatalog", path)); bEnv.executeSql("USE CATALOG testCatalog"); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java index 80cf2cc76711..579237756994 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java @@ -36,20 +36,12 @@ import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.RowType; -import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.TableConfigOptions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import java.io.IOException; -import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -148,20 +140,18 @@ protected List getResult(TableRead read, List splits, RowType row } } - protected StreamExecutionEnvironment buildDefaultEnv(boolean isStreaming) { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1); - - if (isStreaming) { - env.setRuntimeMode(RuntimeExecutionMode.STREAMING); - env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); - env.getCheckpointConfig().setCheckpointInterval(500); - } else { - env.setRuntimeMode(RuntimeExecutionMode.BATCH); - } + @Override + protected TableEnvironmentBuilder tableEnvironmentBuilder() { + return super.tableEnvironmentBuilder() + .checkpointIntervalMs(500) + .parallelism(ThreadLocalRandom.current().nextInt(2) + 1); + } - return env; + @Override + protected StreamExecutionEnvironmentBuilder streamExecutionEnvironmentBuilder() { + return super.streamExecutionEnvironmentBuilder() + .checkpointIntervalMs(500) + .parallelism(ThreadLocalRandom.current().nextInt(2) + 1); } protected T createAction(Class clazz, List args) { @@ -195,17 +185,11 @@ protected void callProcedure(String procedureStatement) { } protected void callProcedure(String procedureStatement, boolean isStreaming, boolean dmlSync) { - StreamExecutionEnvironment env = buildDefaultEnv(isStreaming); - TableEnvironment tEnv; if (isStreaming) { - tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inStreamingMode()); - tEnv.getConfig() - .set( - ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, - Duration.ofMillis(500)); + tEnv = tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(500).build(); } else { - tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); + tEnv = tableEnvironmentBuilder().batchMode().build(); } tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, dmlSync); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java index d0f10766e881..06c29456cfa0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java @@ -272,7 +272,12 @@ private void checkLatestSnapshot( } private void runAction(boolean isStreaming) throws Exception { - StreamExecutionEnvironment env = buildDefaultEnv(isStreaming); + StreamExecutionEnvironment env; + if (isStreaming) { + env = streamExecutionEnvironmentBuilder().streamingMode().build(); + } else { + env = streamExecutionEnvironmentBuilder().batchMode().build(); + } CompactAction action = createAction( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java index d7104f591893..4fa1b3c53bd1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java @@ -133,7 +133,8 @@ public void testBatchCompact(String mode) throws Exception { } if (ThreadLocalRandom.current().nextBoolean()) { - StreamExecutionEnvironment env = buildDefaultEnv(false); + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder().batchMode().build(); createAction( CompactDatabaseAction.class, "compact_database", @@ -237,7 +238,8 @@ public void testStreamingCompact(String mode) throws Exception { "--table_conf", CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() + "=1s"); } - StreamExecutionEnvironment env = buildDefaultEnv(true); + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder().streamingMode().build(); action.withStreamExecutionEnvironment(env).build(); env.executeAsync(); } else { @@ -513,7 +515,8 @@ private void includingAndExcludingTablesImpl( args.add(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() + "=1s"); } - StreamExecutionEnvironment env = buildDefaultEnv(false); + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder().batchMode().build(); createAction(CompactDatabaseAction.class, args) .withStreamExecutionEnvironment(env) .build(); @@ -615,7 +618,8 @@ public void testUnawareBucketStreamingCompact() throws Exception { } if (ThreadLocalRandom.current().nextBoolean()) { - StreamExecutionEnvironment env = buildDefaultEnv(true); + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder().streamingMode().build(); createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", warehouse) .withStreamExecutionEnvironment(env) .build(); @@ -688,7 +692,8 @@ public void testUnawareBucketBatchCompact() throws Exception { } if (ThreadLocalRandom.current().nextBoolean()) { - StreamExecutionEnvironment env = buildDefaultEnv(false); + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder().batchMode().build(); createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", warehouse) .withStreamExecutionEnvironment(env) .build(); @@ -747,7 +752,8 @@ public void testCombinedModeWithDynamicOptions() throws Exception { "--table_conf", CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key() + "=3"); - StreamExecutionEnvironment env = buildDefaultEnv(true); + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder().streamingMode().build(); action.withStreamExecutionEnvironment(env).build(); JobClient jobClient = env.executeAsync(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java index 68c3c42b0a50..f81c253cc99a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java @@ -48,7 +48,6 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; @@ -117,8 +116,7 @@ public void testCompact() throws Exception { write.close(); commit.close(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRuntimeMode(RuntimeExecutionMode.BATCH); + StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().batchMode().build(); CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(tablePath.toString(), table); DataStreamSource source = @@ -152,7 +150,8 @@ public void testCompact() throws Exception { public void testCompactParallelism() throws Exception { FileStoreTable table = createFileStoreTable(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder().streamingMode().build(); CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(tablePath.toString(), table); DataStreamSource source = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java index 6d35351d87c0..fe56eae647a8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java @@ -24,18 +24,12 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.ExceptionUtils; @@ -43,7 +37,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import java.time.Duration; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -136,33 +129,19 @@ private JobClient runRecoverFromSavepointJob(String failingPath, String savepoin SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, conf); } - EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); - tEnv.getConfig() - .getConfiguration() - .set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(500)); - tEnv.getConfig().getConfiguration().set(StateBackendOptions.STATE_BACKEND, "filesystem"); - tEnv.getConfig() - .getConfiguration() - .set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + path + "/checkpoint"); - // input data must be strictly ordered for us to check changelog results - tEnv.getConfig() - .getConfiguration() - .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); - tEnv.getConfig() - .getConfiguration() - .set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); - tEnv.getConfig() - .getConfiguration() - .set( - RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, - Integer.MAX_VALUE); - tEnv.getConfig() - .getConfiguration() - .set( - RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, - Duration.ofSeconds(1)); + TableEnvironment tEnv = + tableEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(500) + // input data must be strictly ordered for us to check changelog results + .parallelism(1) + .allowRestart() + .setConf(conf) + .setConf(StateBackendOptions.STATE_BACKEND, "filesystem") + .setConf( + CheckpointingOptions.CHECKPOINTS_DIRECTORY, + "file://" + path + "/checkpoint") + .build(); String createCatalogSql = String.join( @@ -219,9 +198,7 @@ private JobClient runRecoverFromSavepointJob(String failingPath, String savepoin } private void checkRecoverFromSavepointBatchResult() throws Exception { - EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); - TableEnvironment tEnv = TableEnvironment.create(settings); - + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); tEnv.executeSql( String.join( "\n", @@ -248,9 +225,7 @@ private void checkRecoverFromSavepointBatchResult() throws Exception { } private void checkRecoverFromSavepointStreamingResult() throws Exception { - EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); - TableEnvironment tEnv = TableEnvironment.create(settings); - + TableEnvironment tEnv = tableEnvironmentBuilder().streamingMode().build(); tEnv.executeSql( String.join( "\n", diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java index 2a95b5d589aa..3164dd412504 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java @@ -103,7 +103,8 @@ public void testBatchRead(boolean defaultOptions) throws Exception { write.write(rowData(1, 1510, BinaryString.fromString("20221209"), 15)); commit.commit(1, write.prepareCommit(true, 1)); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder().streamingMode().build(); DataStreamSource compactorSource = new CompactorSourceBuilder("test", table) .withContinuousMode(false) @@ -164,7 +165,8 @@ public void testStreamingRead(boolean defaultOptions) throws Exception { write.write(rowData(2, 1620, BinaryString.fromString("20221209"), 16)); commit.commit(3, write.prepareCommit(true, 3)); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder().streamingMode().build(); DataStreamSource compactorSource = new CompactorSourceBuilder("test", table) .withContinuousMode(true) @@ -257,7 +259,8 @@ private void testPartitionSpec( write.write(rowData(1, 1510, BinaryString.fromString("20221209"), 15)); commit.commit(2, write.prepareCommit(true, 2)); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder().streamingMode().build(); DataStreamSource compactorSource = new CompactorSourceBuilder("test", table) .withContinuousMode(isStreaming) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java index 56e9374d5214..5555856035f1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java @@ -41,7 +41,6 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; -import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; @@ -156,9 +155,11 @@ public void testBatchRead(boolean defaultOptions) throws Exception { } } - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRuntimeMode(RuntimeExecutionMode.BATCH); - env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1); + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder() + .batchMode() + .parallelism(ThreadLocalRandom.current().nextInt(2) + 1) + .build(); DataStream source = new MultiTablesCompactorSourceBuilder( catalogLoader(), @@ -254,7 +255,8 @@ public void testStreamingRead(boolean defaultOptions) throws Exception { } } - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder().streamingMode().build(); DataStream compactorSource = new MultiTablesCompactorSourceBuilder( catalogLoader(), @@ -423,7 +425,8 @@ public void testIncludeAndExcludeTableRead(boolean defaultOptions) throws Except } } - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder().streamingMode().build(); DataStream compactorSource = new MultiTablesCompactorSourceBuilder( catalogLoader(), diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java index 0850d6197f54..d3efddecc567 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java @@ -20,9 +20,19 @@ import org.apache.paimon.utils.FileIOUtils; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; @@ -30,12 +40,13 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; +import java.time.Duration; import java.util.UUID; /** Similar to Flink's AbstractTestBase but using Junit5. */ public class AbstractTestBase { - private static final int DEFAULT_PARALLELISM = 8; + private static final int DEFAULT_PARALLELISM = 16; @RegisterExtension protected static final MiniClusterWithClientExtension MINI_CLUSTER_EXTENSION = @@ -92,4 +103,205 @@ protected File createAndRegisterTempFile(String fileName) { return new File( temporaryFolder.toFile(), String.format("%s/%s", UUID.randomUUID(), fileName)); } + + // ---------------------------------------------------------------------------------------------------------------- + // Table Environment Utilities + // ---------------------------------------------------------------------------------------------------------------- + + protected TableEnvironmentBuilder tableEnvironmentBuilder() { + return new TableEnvironmentBuilder(); + } + + /** Builder for {@link TableEnvironmentBuilder} in tests. */ + protected static class TableEnvironmentBuilder { + + private boolean streamingMode = true; + private Integer parallelism = null; + private Integer checkpointIntervalMs = null; + private boolean allowRestart = false; + private Configuration conf = new Configuration(); + + public TableEnvironmentBuilder batchMode() { + this.streamingMode = false; + return this; + } + + public TableEnvironmentBuilder streamingMode() { + this.streamingMode = true; + return this; + } + + public TableEnvironmentBuilder parallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + + public TableEnvironmentBuilder checkpointIntervalMs(int checkpointIntervalMs) { + this.checkpointIntervalMs = checkpointIntervalMs; + return this; + } + + public TableEnvironmentBuilder allowRestart() { + this.allowRestart = true; + return this; + } + + public TableEnvironmentBuilder allowRestart(boolean allowRestart) { + this.allowRestart = allowRestart; + return this; + } + + public TableEnvironmentBuilder setConf(ConfigOption option, T value) { + conf.set(option, value); + return this; + } + + public TableEnvironmentBuilder setConf(Configuration conf) { + this.conf.addAll(conf); + return this; + } + + public TableEnvironment build() { + TableEnvironment tEnv; + if (streamingMode) { + tEnv = + TableEnvironment.create( + EnvironmentSettings.newInstance().inStreamingMode().build()); + tEnv.getConfig() + .set( + ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, + ExecutionConfigOptions.UpsertMaterialize.NONE); + if (checkpointIntervalMs != null) { + tEnv.getConfig() + .getConfiguration() + .set( + ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, + Duration.ofMillis(checkpointIntervalMs)); + } + } else { + tEnv = + TableEnvironment.create( + EnvironmentSettings.newInstance().inBatchMode().build()); + } + + if (parallelism != null) { + tEnv.getConfig() + .getConfiguration() + .set( + ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, + parallelism); + } + + if (allowRestart) { + tEnv.getConfig() + .getConfiguration() + .set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); + tEnv.getConfig() + .getConfiguration() + .set( + RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, + Integer.MAX_VALUE); + tEnv.getConfig() + .getConfiguration() + .set( + RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, + Duration.ofSeconds(1)); + } else { + tEnv.getConfig() + .getConfiguration() + .set(RestartStrategyOptions.RESTART_STRATEGY, "disable"); + } + + tEnv.getConfig().getConfiguration().addAll(conf); + + return tEnv; + } + } + + // ---------------------------------------------------------------------------------------------------------------- + // Stream Execution Environment Utilities + // ---------------------------------------------------------------------------------------------------------------- + + protected StreamExecutionEnvironmentBuilder streamExecutionEnvironmentBuilder() { + return new StreamExecutionEnvironmentBuilder(); + } + + /** Builder for {@link StreamExecutionEnvironment} in tests. */ + protected static class StreamExecutionEnvironmentBuilder { + + private boolean streamingMode = true; + private Integer parallelism = null; + private Integer checkpointIntervalMs = null; + private boolean allowRestart = false; + private Configuration conf = new Configuration(); + + public StreamExecutionEnvironmentBuilder batchMode() { + this.streamingMode = false; + return this; + } + + public StreamExecutionEnvironmentBuilder streamingMode() { + this.streamingMode = true; + return this; + } + + public StreamExecutionEnvironmentBuilder parallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + + public StreamExecutionEnvironmentBuilder checkpointIntervalMs(int checkpointIntervalMs) { + this.checkpointIntervalMs = checkpointIntervalMs; + return this; + } + + public StreamExecutionEnvironmentBuilder allowRestart() { + this.allowRestart = true; + return this; + } + + public StreamExecutionEnvironmentBuilder allowRestart(boolean allowRestart) { + this.allowRestart = allowRestart; + return this; + } + + public StreamExecutionEnvironmentBuilder setConf(ConfigOption option, T value) { + conf.set(option, value); + return this; + } + + public StreamExecutionEnvironment build() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + if (streamingMode) { + env.setRuntimeMode(RuntimeExecutionMode.STREAMING); + if (checkpointIntervalMs != null) { + env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + env.getCheckpointConfig().setCheckpointInterval(checkpointIntervalMs); + } + } else { + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + } + + if (parallelism != null) { + env.setParallelism(parallelism); + } + + Configuration conf = new Configuration(); + if (allowRestart) { + conf.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); + conf.set( + RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, + Integer.MAX_VALUE); + conf.set( + RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, + Duration.ofSeconds(1)); + } else { + conf.set(RestartStrategyOptions.RESTART_STRATEGY, "disable"); + } + conf.addAll(this.conf); + env.configure(conf); + + return env; + } + } } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java index 70345e4f8026..ff02a352e248 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java @@ -25,11 +25,8 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.hadoop.hive.conf.HiveConf; import org.assertj.core.api.Assertions; @@ -90,10 +87,7 @@ private void resetMetastore() throws Exception { } public void testUpgradePartitionTable(String format) throws Exception { - StreamExecutionEnvironment env = buildDefaultEnv(false); - - TableEnvironment tEnv = - StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')"); tEnv.useCatalog("HIVE"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); @@ -149,10 +143,7 @@ public void testUpgradePartitionTable(String format) throws Exception { } public void testUpgradeNonPartitionTable(String format) throws Exception { - StreamExecutionEnvironment env = buildDefaultEnv(false); - - TableEnvironment tEnv = - StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')"); tEnv.useCatalog("HIVE"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); @@ -208,10 +199,7 @@ public void testUpgradeNonPartitionTable(String format) throws Exception { @ParameterizedTest @ValueSource(strings = {"orc", "parquet", "avro"}) public void testMigrateDatabaseAction(String format) throws Exception { - StreamExecutionEnvironment env = buildDefaultEnv(false); - - TableEnvironment tEnv = - StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')"); tEnv.useCatalog("HIVE"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java index b41a187d7d94..b71a38c59cee 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java @@ -24,11 +24,8 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.hadoop.hive.conf.HiveConf; import org.assertj.core.api.Assertions; @@ -72,10 +69,7 @@ public void testParquet() throws Exception { } public void test(String format) throws Exception { - StreamExecutionEnvironment env = buildDefaultEnv(false); - - TableEnvironment tEnv = - StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')"); tEnv.useCatalog("HIVE"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java index 9c727ba1e75c..634f10a60be7 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java @@ -25,11 +25,8 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.hadoop.hive.conf.HiveConf; import org.assertj.core.api.Assertions; @@ -89,10 +86,7 @@ private void resetMetastore() throws Exception { } public void testUpgradePartitionTable(String format) throws Exception { - StreamExecutionEnvironment env = buildDefaultEnv(false); - - TableEnvironment tEnv = - StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')"); tEnv.useCatalog("HIVE"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); @@ -125,10 +119,7 @@ public void testUpgradePartitionTable(String format) throws Exception { } public void testUpgradeNonPartitionTable(String format) throws Exception { - StreamExecutionEnvironment env = buildDefaultEnv(false); - - TableEnvironment tEnv = - StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')"); tEnv.useCatalog("HIVE"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); @@ -161,10 +152,7 @@ public void testUpgradeNonPartitionTable(String format) throws Exception { @ParameterizedTest @ValueSource(strings = {"orc", "parquet", "avro"}) public void testMigrateAction(String format) throws Exception { - StreamExecutionEnvironment env = buildDefaultEnv(false); - - TableEnvironment tEnv = - StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')"); tEnv.useCatalog("HIVE"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);