Skip to content

Commit

Permalink
[FLINK-34853] Submit CDC Job To Flink K8S Native Application Mode
Browse files Browse the repository at this point in the history
  • Loading branch information
czy006 committed Apr 8, 2024
1 parent 927a0ec commit 7d1ae74
Show file tree
Hide file tree
Showing 13 changed files with 325 additions and 35 deletions.
33 changes: 33 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#/*
# * Licensed to the Apache Software Foundation (ASF) under one or more
# * contributor license agreements. See the NOTICE file distributed with
# * this work for additional information regarding copyright ownership.
# * The ASF licenses this file to You under the Apache License, Version 2.0
# * (the "License"); you may not use this file except in compliance with
# * the License. You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# */

FROM flink

ARG FLINK_CDC_VERSION=3.0-SNAPSHOT

RUN mkdir -p /opt/flink-cdc
RUN mkdir -p /opt/flink/usrlib
ENV FLINK_CDC_HOME /opt/flink-cdc
COPY flink-cdc-dist/target/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz /tmp/
RUN tar -xzvf /tmp/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz -C /tmp/ && \
mv /tmp/flink-cdc-${FLINK_CDC_VERSION}/* /opt/flink-cdc/ && \
rm -rf /tmp/flink-cdc-${FLINK_CDC_VERSION} /tmp/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz
# copy jars to cdc libs
COPY flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/target/flink-cdc-pipeline-connector-values-${FLINK_CDC_VERSION}.jar /opt/flink/usrlib/flink-cdc-pipeline-connector-values-${FLINK_CDC_VERSION}.jar
COPY flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/target/flink-cdc-pipeline-connector-mysql-${FLINK_CDC_VERSION}.jar /opt/flink/usrlib/flink-cdc-pipeline-connector-mysql-${FLINK_CDC_VERSION}.jar
# copy flink cdc pipeline conf file, Here is an example. Users can replace it according to their needs.
COPY mysql-doris.yaml $FLINK_CDC_HOME/conf
8 changes: 8 additions & 0 deletions flink-cdc-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ limitations under the License.
<artifactId>commons-cli</artifactId>
<version>${commons-cli.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@

import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser;
import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
import org.apache.flink.cdc.cli.utils.ConfigurationUtils;
import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.flink.deployment.ComposeDeploymentFactory;

import org.apache.commons.cli.CommandLine;

import java.nio.file.Path;
import java.util.List;
Expand All @@ -38,14 +43,18 @@ public class CliExecutor {
private final boolean useMiniCluster;
private final List<Path> additionalJars;

private final CommandLine commandLine;

private PipelineComposer composer = null;

public CliExecutor(
CommandLine commandLine,
Path pipelineDefPath,
Configuration flinkConfig,
Configuration globalPipelineConfig,
boolean useMiniCluster,
List<Path> additionalJars) {
this.commandLine = commandLine;
this.pipelineDefPath = pipelineDefPath;
this.flinkConfig = flinkConfig;
this.globalPipelineConfig = globalPipelineConfig;
Expand All @@ -54,22 +63,32 @@ public CliExecutor(
}

public PipelineExecution.ExecutionInfo run() throws Exception {
// Parse pipeline definition file
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);

// Create composer
PipelineComposer composer = getComposer(flinkConfig);

// Compose pipeline
PipelineExecution execution = composer.compose(pipelineDef);

// Execute the pipeline
return execution.execute();
// Create Submit Executor to deployment flink cdc job Or Run Flink CDC Job
boolean isDeploymentMode = ConfigurationUtils.isDeploymentMode(commandLine);
if (isDeploymentMode) {
ComposeDeploymentFactory composeDeploymentFactory = new ComposeDeploymentFactory();
PipelineDeploymentExecutor composeExecutor =
composeDeploymentFactory.getFlinkComposeExecutor(commandLine);
return composeExecutor.deploy(
commandLine,
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()),
additionalJars);
} else {

// Run CDC Job And Parse pipeline definition file
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
// Create composer
PipelineComposer composer = getComposer(flinkConfig);
// Compose pipeline
PipelineExecution execution = composer.compose(pipelineDef);
// Execute or submit the pipeline
return execution.execute();
}
}

private PipelineComposer getComposer(Configuration flinkConfig) {
private PipelineComposer getComposer(Configuration flinkConfig) throws Exception {
if (composer == null) {
return FlinkEnvironmentUtils.createComposer(
useMiniCluster, flinkConfig, additionalJars);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
Expand Down Expand Up @@ -76,13 +74,9 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {
"Missing pipeline definition file path in arguments. ");
}

// Take the first unparsed argument as the pipeline definition file
Path pipelineDefPath = Paths.get(unparsedArgs.get(0));
if (!Files.exists(pipelineDefPath)) {
throw new FileNotFoundException(
String.format("Cannot find pipeline definition file \"%s\"", pipelineDefPath));
}

// Take the first unparsed argument as the pipeline definition file
LOG.info("Real Path pipelineDefPath {}", pipelineDefPath);
// Global pipeline configuration
Configuration globalPipelineConfig = getGlobalConfig(commandLine);

Expand All @@ -101,6 +95,7 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {

// Build executor
return new CliExecutor(
commandLine,
pipelineDefPath,
flinkConfig,
globalPipelineConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ public class CliFrontendOptions {
.desc("JARs to be submitted together with the pipeline")
.build();

public static final Option TARGET =
Option.builder("t")
.longOpt("target")
.hasArg()
.desc(
"The deployment target for the execution. This can take one of the following values "
+ "local/remote/yarn-session/yarn-application/kubernetes-session/kubernetes"
+ "-application")
.build();

public static final Option USE_MINI_CLUSTER =
Option.builder()
.longOpt("use-mini-cluster")
Expand All @@ -59,6 +69,7 @@ public static Options initializeOptions() {
.addOption(JAR)
.addOption(FLINK_HOME)
.addOption(GLOBAL_CONFIG)
.addOption(USE_MINI_CLUSTER);
.addOption(USE_MINI_CLUSTER)
.addOption(TARGET);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@
package org.apache.flink.cdc.cli.utils;

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.client.deployment.executors.LocalExecutor;
import org.apache.flink.client.deployment.executors.RemoteExecutor;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

import org.apache.commons.cli.CommandLine;

import java.io.FileNotFoundException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;

import static org.apache.flink.cdc.cli.CliFrontendOptions.TARGET;

/** Utilities for handling {@link Configuration}. */
public class ConfigurationUtils {
public static Configuration loadMapFormattedConfig(Path configPath) throws Exception {
Expand All @@ -48,4 +54,11 @@ public static Configuration loadMapFormattedConfig(Path configPath) throws Excep
e);
}
}

public static boolean isDeploymentMode(CommandLine commandLine) {
String target = commandLine.getOptionValue(TARGET);
return target != null
&& !target.equalsIgnoreCase(LocalExecutor.NAME)
&& !target.equalsIgnoreCase(RemoteExecutor.NAME);
}
}
7 changes: 6 additions & 1 deletion flink-cdc-composer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,19 @@ limitations under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.composer;

import org.apache.flink.configuration.Configuration;

import org.apache.commons.cli.CommandLine;

import java.nio.file.Path;
import java.util.List;

/** PipelineDeploymentExecutor to execute flink cdc job from different target. */
public interface PipelineDeploymentExecutor {

PipelineExecution.ExecutionInfo deploy(
CommandLine commandLine, Configuration flinkConfig, List<Path> additionalJars)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
@Internal
public class FlinkPipelineComposer implements PipelineComposer {

private final StreamExecutionEnvironment env;
private final boolean isBlocking;

public static FlinkPipelineComposer ofRemoteCluster(
Expand All @@ -77,21 +76,20 @@ public static FlinkPipelineComposer ofRemoteCluster(
e);
}
});
return new FlinkPipelineComposer(env, false);
return new FlinkPipelineComposer(false);
}

public static FlinkPipelineComposer ofMiniCluster() {
return new FlinkPipelineComposer(
StreamExecutionEnvironment.getExecutionEnvironment(), true);
return new FlinkPipelineComposer(true);
}

private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking) {
this.env = env;
public FlinkPipelineComposer(boolean isBlocking) {
this.isBlocking = isBlocking;
}

@Override
public PipelineExecution compose(PipelineDef pipelineDef) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
env.getConfig().setParallelism(parallelism);

Expand All @@ -105,7 +103,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
stream = routeTranslator.translate(stream, pipelineDef.getRoute());

// Create sink in advance as schema operator requires MetadataApplier
DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig());
DataSink dataSink = createDataSink(env, pipelineDef.getSink(), pipelineDef.getConfig());

// Schema operator
SchemaOperatorTranslator schemaOperatorTranslator =
Expand All @@ -132,13 +130,14 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());

// Add framework JARs
addFrameworkJars();
addFrameworkJars(env);

return new FlinkPipelineExecution(
env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
}

private DataSink createDataSink(SinkDef sinkDef, Configuration pipelineConfig) {
private DataSink createDataSink(
StreamExecutionEnvironment env, SinkDef sinkDef, Configuration pipelineConfig) {
// Search the data sink factory
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
Expand All @@ -156,7 +155,7 @@ private DataSink createDataSink(SinkDef sinkDef, Configuration pipelineConfig) {
Thread.currentThread().getContextClassLoader()));
}

private void addFrameworkJars() {
private void addFrameworkJars(StreamExecutionEnvironment env) {
try {
Set<URI> frameworkJars = new HashSet<>();
// Common JAR
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.composer.flink.deployment;

import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;

import org.apache.commons.cli.CommandLine;

/** Create deployment methods corresponding to different goals. */
public class ComposeDeploymentFactory {

public PipelineDeploymentExecutor getFlinkComposeExecutor(CommandLine commandLine)
throws Exception {
String target = commandLine.getOptionValue("target");
if (target.equalsIgnoreCase("kubernetes-application")) {
return new K8SApplicationDeploymentExecutor();
}
throw new Exception(String.format("target %s is not support", target));
}
}
Loading

0 comments on commit 7d1ae74

Please sign in to comment.