Skip to content

Commit

Permalink
[flink] Set default recover strategy for TableEnvironment and StreamE…
Browse files Browse the repository at this point in the history
…xecutionEnvironment in tests (apache#3133)
  • Loading branch information
tsreaper authored Apr 10, 2024
1 parent 82144ef commit 3ca6fc0
Show file tree
Hide file tree
Showing 23 changed files with 401 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.paimon.types.RowType;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -66,10 +65,12 @@ public class CdcActionITCaseBase extends ActionITCaseBase {

@BeforeEach
public void setEnv() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.enableCheckpointing(1000);
env.setRestartStrategy(RestartStrategies.noRestart());
env =
streamExecutionEnvironmentBuilder()
.streamingMode()
.parallelism(2)
.checkpointIntervalMs(1000)
.build();
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.flink.util.AbstractTestBase;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
Expand Down Expand Up @@ -103,9 +102,8 @@ protected void doStart() {

@BeforeEach
public void setup() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
env = streamExecutionEnvironmentBuilder().streamingMode().build();
tEnv = StreamTableEnvironment.create(env);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
tEnv.getConfig()
.getConfiguration()
.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.TraceableFileIO;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -148,12 +147,12 @@ private void innerTestRandomCdcEvents(Supplier<Integer> bucket, boolean unawareB
}

List<TestCdcEvent> events = mergeTestTableEvents(testTables);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointInterval(100);
if (!enableFailure) {
env.setRestartStrategy(RestartStrategies.noRestart());
}
StreamExecutionEnvironment env =
streamExecutionEnvironmentBuilder()
.streamingMode()
.checkpointIntervalMs(100)
.allowRestart(enableFailure)
.build();

TestCdcSourceFunction sourceFunction = new TestCdcSourceFunction(events);
DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.TraceableFileIO;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Disabled;
Expand Down Expand Up @@ -145,12 +144,12 @@ private void innerTestRandomCdcEvents(
Collections.singletonList("pt"),
primaryKeys,
numBucket);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointInterval(100);
if (!enableFailure) {
env.setRestartStrategy(RestartStrategies.noRestart());
}
StreamExecutionEnvironment env =
streamExecutionEnvironmentBuilder()
.streamingMode()
.checkpointIntervalMs(100)
.allowRestart(enableFailure)
.build();

TestCdcSourceFunction sourceFunction = new TestCdcSourceFunction(testTable.events());
DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,17 @@ public void open(FunctionContext context) throws Exception {
}

private Collection<RowData> lookup(RowData keyRow) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread()
.setContextClassLoader(AsyncLookupFunctionWrapper.class.getClassLoader());
try {
synchronized (function) {
return function.lookup(keyRow);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
Thread.currentThread().setContextClassLoader(cl);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
Expand All @@ -51,7 +50,6 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -60,8 +58,6 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;

/** ITCase for catalog. */
public abstract class CatalogITCaseBase extends AbstractTestBase {

Expand All @@ -71,7 +67,7 @@ public abstract class CatalogITCaseBase extends AbstractTestBase {

@BeforeEach
public void before() throws IOException {
tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
tEnv = tableEnvironmentBuilder().batchMode().build();
String catalog = "PAIMON";
path = getTempDirPath();
String inferScan =
Expand All @@ -89,8 +85,7 @@ public void before() throws IOException {
.collect(Collectors.joining(","))));
tEnv.useCatalog(catalog);

sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
sEnv = tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).build();
sEnv.registerCatalog(catalog, tEnv.getCatalog(catalog).get());
sEnv.useCatalog(catalog);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.FailingFileIO;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -396,19 +395,16 @@ private static RowData srcRow(RowKind kind, int v, String p, int k) {
return wrap(GenericRowData.ofKind(kind, v, StringData.fromString(p), k));
}

public static StreamExecutionEnvironment buildStreamEnv() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.enableCheckpointing(100);
env.setParallelism(2);
return env;
public StreamExecutionEnvironment buildStreamEnv() {
return streamExecutionEnvironmentBuilder()
.streamingMode()
.checkpointIntervalMs(100)
.parallelism(2)
.build();
}

public static StreamExecutionEnvironment buildBatchEnv() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(2);
return env;
public StreamExecutionEnvironment buildBatchEnv() {
return streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build();
}

public static FileStoreTable buildFileStoreTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.BlockingIterator;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -55,18 +53,16 @@ public class FileSystemCatalogITCase extends AbstractTestBase {
private static final String DB_NAME = "default";

private String path;
private StreamTableEnvironment tEnv;
private TableEnvironment tEnv;

@BeforeEach
public void setup() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setParallelism(1);

tEnv = StreamTableEnvironment.create(env);
tEnv.getConfig()
.getConfiguration()
.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
tEnv =
tableEnvironmentBuilder()
.streamingMode()
.parallelism(1)
.setConf(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false)
.build();
path = getTempDirPath();
tEnv.executeSql(
String.format("CREATE CATALOG fs WITH ('type'='paimon', 'warehouse'='%s')", path));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@

import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
Expand Down Expand Up @@ -59,7 +56,7 @@ public abstract class FlinkTestBase extends AbstractTestBase {
protected ExpectedResult expectedResult;
protected boolean ignoreException;

protected StreamTableEnvironment tEnv;
protected TableEnvironment tEnv;
protected String rootPath;

protected ResolvedCatalogTable resolvedTable =
Expand All @@ -79,14 +76,11 @@ protected void prepareEnv(
this.ignoreException = ignoreException;
this.expectedResult = expectedResult;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance().inBatchMode();
if (executionMode == RuntimeExecutionMode.STREAMING) {
env.enableCheckpointing(100);
builder.inStreamingMode();
tEnv = tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).build();
} else {
tEnv = tableEnvironmentBuilder().batchMode().build();
}
tEnv = StreamTableEnvironment.create(env, builder.build());
rootPath = getTempDirPath();

tEnv.executeSql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ public void testAsyncRetryLookup(LookupCacheMode cacheMode) throws Exception {

String query =
"SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss',"
+ " 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='60') */"
+ " 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='30') */"
+ " T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS('lookup.async'='true') */ for system_time as of T.proctime AS D ON T.i = D.i";
BlockingIterator<Row, Row> iterator = BlockingIterator.of(sEnv.executeSql(query).collect());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.types.Row;
Expand All @@ -44,7 +43,7 @@ public class MappingTableITCase extends AbstractTestBase {

@BeforeEach
public void before() throws IOException {
tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
tEnv = tableEnvironmentBuilder().batchMode().build();
path = getTempDirPath();
}

Expand Down
Loading

0 comments on commit 3ca6fc0

Please sign in to comment.