Skip to content

Commit

Permalink
Submit CDC Job To Flink K8S Native Application Mode
Browse files Browse the repository at this point in the history
  • Loading branch information
czy006 committed Mar 8, 2024
1 parent 1dc201f commit 5d0f014
Show file tree
Hide file tree
Showing 12 changed files with 276 additions and 34 deletions.
16 changes: 16 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM flink
RUN mkdir -p /opt/flink-cdc
ENV FLINK_CDC_HOME /opt/flink-cdc
COPY flink-cdc-dist/target/flink-cdc-3.0-SNAPSHOT-bin.tar.gz /tmp/
RUN tar -xzvf /tmp/flink-cdc-3.0-SNAPSHOT-bin.tar.gz -C /tmp/ && \
mv /tmp/flink-cdc-3.0-SNAPSHOT/* $FLINK_CDC_HOME/ && \
rm -rf /tmp/flink-cdc-3.0-SNAPSHOT /tmp/flink-cdc-3.0-SNAPSHOT-bin.tar.gz
# copy jars to cdc libs
#RUN wget -P $FLINK_CDC_HOME/lib https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar
#RUN wget -P $FLINK_CDC_HOME/lib https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar
COPY flink-cdc-pipeline-connector-doris-3.0-SNAPSHOT.jar $FLINK_HOME/lib
COPY flink-cdc-pipeline-connector-mysql-3.0-SNAPSHOT.jar $FLINK_HOME/lib
COPY flink-cdc-pipeline-connector-doris-3.0-SNAPSHOT.jar $$FLINK_CDC_HOME/lib
COPY flink-cdc-pipeline-connector-mysql-3.0-SNAPSHOT.jar $$FLINK_CDC_HOME/lib
# copy flink cdc pipeline conf file, Here is an example. Users can replace it according to their needs.
COPY mysql-doirs.yaml $FLINK_CDC_HOME/conf
8 changes: 8 additions & 0 deletions flink-cdc-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ limitations under the License.
<artifactId>commons-cli</artifactId>
<version>${commons-cli.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

package org.apache.flink.cdc.cli;

import org.apache.commons.cli.CommandLine;
import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser;
import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
import org.apache.flink.cdc.cli.utils.ConfigurationUtils;
import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineComposeExecutor;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.flink.executors.ComposeExecutorFactory;

import java.nio.file.Path;
import java.util.List;
Expand All @@ -38,14 +42,18 @@ public class CliExecutor {
private final boolean useMiniCluster;
private final List<Path> additionalJars;

private final CommandLine commandLine;

private PipelineComposer composer = null;

public CliExecutor(
CommandLine commandLine,
Path pipelineDefPath,
Configuration flinkConfig,
Configuration globalPipelineConfig,
boolean useMiniCluster,
List<Path> additionalJars) {
this.commandLine = commandLine;
this.pipelineDefPath = pipelineDefPath;
this.flinkConfig = flinkConfig;
this.globalPipelineConfig = globalPipelineConfig;
Expand All @@ -54,22 +62,31 @@ public CliExecutor(
}

public PipelineExecution.ExecutionInfo run() throws Exception {
// Parse pipeline definition file
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);

// Create composer
PipelineComposer composer = getComposer(flinkConfig);

// Compose pipeline
PipelineExecution execution = composer.compose(pipelineDef);

// Execute the pipeline
return execution.execute();
// Create Submit Executor to deployment flink cdc job Or Run Flink CDC Job
boolean isDeploymentMode = ConfigurationUtils.isDeploymentMode(commandLine);
if (isDeploymentMode) {
ComposeExecutorFactory composeExecutorFactory = new ComposeExecutorFactory();
PipelineComposeExecutor composeExecutor =
composeExecutorFactory.getFlinkComposeExecutor(commandLine);
return composeExecutor.deploy(
commandLine,
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()),
additionalJars);
} else {
// Run CDC Job And Parse pipeline definition file
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
// Create composer
PipelineComposer composer = getComposer(flinkConfig);
// Compose pipeline
PipelineExecution execution = composer.compose(pipelineDef);
// Execute or submit the pipeline
return execution.execute();
}
}

private PipelineComposer getComposer(Configuration flinkConfig) {
private PipelineComposer getComposer(Configuration flinkConfig) throws Exception {
if (composer == null) {
return FlinkEnvironmentUtils.createComposer(
useMiniCluster, flinkConfig, additionalJars);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
Expand Down Expand Up @@ -76,13 +74,13 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {
"Missing pipeline definition file path in arguments. ");
}

// Take the first unparsed argument as the pipeline definition file
Path pipelineDefPath = Paths.get(unparsedArgs.get(0));
if (!Files.exists(pipelineDefPath)) {
throw new FileNotFoundException(
String.format("Cannot find pipeline definition file \"%s\"", pipelineDefPath));
Path pipelineDefPath = null;
// If Running cdc job,need to set pipeline path
if (!ConfigurationUtils.isDeploymentMode(commandLine)) {
// Take the first unparsed argument as the pipeline definition file
pipelineDefPath = Paths.get(unparsedArgs.get(0));
LOG.info("Real Path pipelineDefPath {}", pipelineDefPath);
}

// Global pipeline configuration
Configuration globalPipelineConfig = getGlobalConfig(commandLine);

Expand All @@ -101,6 +99,7 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {

// Build executor
return new CliExecutor(
commandLine,
pipelineDefPath,
flinkConfig,
globalPipelineConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ public class CliFrontendOptions {
.desc("JARs to be submitted together with the pipeline")
.build();

public static final Option TARGET =
Option.builder()
.longOpt("target")
.hasArg()
.desc(
"The deployment target for the execution. This can take one of the following values "
+ "local/remote/yarn-session/yarn-application/kubernetes-session/kubernetes"
+ "-application")
.build();

public static final Option USE_MINI_CLUSTER =
Option.builder()
.longOpt("use-mini-cluster")
Expand All @@ -59,6 +69,7 @@ public static Options initializeOptions() {
.addOption(JAR)
.addOption(FLINK_HOME)
.addOption(GLOBAL_CONFIG)
.addOption(USE_MINI_CLUSTER);
.addOption(USE_MINI_CLUSTER)
.addOption(TARGET);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

import org.apache.commons.cli.CommandLine;

import java.io.FileNotFoundException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -48,4 +50,11 @@ public static Configuration loadMapFormattedConfig(Path configPath) throws Excep
e);
}
}

public static boolean isDeploymentMode(CommandLine commandLine) {
String target = commandLine.getOptionValue("target");
return target != null
&& !target.equalsIgnoreCase("local")
&& !target.equalsIgnoreCase("remote");
}
}
7 changes: 6 additions & 1 deletion flink-cdc-composer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,19 @@ limitations under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.composer;

import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.configuration.Configuration;

import org.apache.commons.cli.CommandLine;

import java.nio.file.Path;
import java.util.List;

/** PipelineComposeExecutor to execute flink cdc job from different target */
public interface PipelineComposeExecutor {

PipelineExecution.ExecutionInfo deploy(
CommandLine commandLine, Configuration flinkConfig, List<Path> additionalJars)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
@Internal
public class FlinkPipelineComposer implements PipelineComposer {

private final StreamExecutionEnvironment env;
private final boolean isBlocking;

public static FlinkPipelineComposer ofRemoteCluster(
Expand All @@ -77,21 +76,20 @@ public static FlinkPipelineComposer ofRemoteCluster(
e);
}
});
return new FlinkPipelineComposer(env, false);
return new FlinkPipelineComposer(false);
}

public static FlinkPipelineComposer ofMiniCluster() {
return new FlinkPipelineComposer(
StreamExecutionEnvironment.getExecutionEnvironment(), true);
return new FlinkPipelineComposer(true);
}

private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking) {
this.env = env;
public FlinkPipelineComposer(boolean isBlocking) {
this.isBlocking = isBlocking;
}

@Override
public PipelineExecution compose(PipelineDef pipelineDef) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
env.getConfig().setParallelism(parallelism);

Expand All @@ -105,7 +103,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
stream = routeTranslator.translate(stream, pipelineDef.getRoute());

// Create sink in advance as schema operator requires MetadataApplier
DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig());
DataSink dataSink = createDataSink(env, pipelineDef.getSink(), pipelineDef.getConfig());

// Schema operator
SchemaOperatorTranslator schemaOperatorTranslator =
Expand All @@ -132,13 +130,14 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());

// Add framework JARs
addFrameworkJars();
addFrameworkJars(env);

return new FlinkPipelineExecution(
env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
}

private DataSink createDataSink(SinkDef sinkDef, Configuration pipelineConfig) {
private DataSink createDataSink(
StreamExecutionEnvironment env, SinkDef sinkDef, Configuration pipelineConfig) {
// Search the data sink factory
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
Expand All @@ -156,7 +155,7 @@ private DataSink createDataSink(SinkDef sinkDef, Configuration pipelineConfig) {
Thread.currentThread().getContextClassLoader()));
}

private void addFrameworkJars() {
private void addFrameworkJars(StreamExecutionEnvironment env) {
try {
Set<URI> frameworkJars = new HashSet<>();
// Common JAR
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.composer.flink.executors;

import org.apache.commons.cli.CommandLine;
import org.apache.flink.cdc.composer.PipelineComposeExecutor;

/** Create deployment methods corresponding to different goals */
public class ComposeExecutorFactory {

public PipelineComposeExecutor getFlinkComposeExecutor(CommandLine commandLine)
throws Exception {
String target = commandLine.getOptionValue("target");
if (target.equalsIgnoreCase("kubernetes-application")) {
return new K8SApplicationComposeExecutor();
}
throw new Exception(String.format("target %s is not support", target));
}
}
Loading

0 comments on commit 5d0f014

Please sign in to comment.