diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 00000000000..ba0df98632e
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,16 @@
+FROM flink
+RUN mkdir -p /opt/flink-cdc
+ENV FLINK_CDC_HOME /opt/flink-cdc
+COPY flink-cdc-dist/target/flink-cdc-3.0-SNAPSHOT-bin.tar.gz /tmp/
+RUN tar -xzvf /tmp/flink-cdc-3.0-SNAPSHOT-bin.tar.gz -C /tmp/ && \
+ mv /tmp/flink-cdc-3.0-SNAPSHOT/* $FLINK_CDC_HOME/ && \
+ rm -rf /tmp/flink-cdc-3.0-SNAPSHOT /tmp/flink-cdc-3.0-SNAPSHOT-bin.tar.gz
+# copy jars to cdc libs
+#RUN wget -P $FLINK_CDC_HOME/lib https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar
+#RUN wget -P $FLINK_CDC_HOME/lib https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar
+COPY flink-cdc-pipeline-connector-doris-3.0-SNAPSHOT.jar $FLINK_HOME/lib
+COPY flink-cdc-pipeline-connector-mysql-3.0-SNAPSHOT.jar $FLINK_HOME/lib
+COPY flink-cdc-pipeline-connector-doris-3.0-SNAPSHOT.jar $$FLINK_CDC_HOME/lib
+COPY flink-cdc-pipeline-connector-mysql-3.0-SNAPSHOT.jar $$FLINK_CDC_HOME/lib
+# copy flink cdc pipeline conf file, Here is an example. Users can replace it according to their needs.
+COPY mysql-doirs.yaml $FLINK_CDC_HOME/conf
diff --git a/flink-cdc-cli/pom.xml b/flink-cdc-cli/pom.xml
index a2013d26a8f..a4c0593d521 100644
--- a/flink-cdc-cli/pom.xml
+++ b/flink-cdc-cli/pom.xml
@@ -49,6 +49,14 @@ limitations under the License.
commons-cli
${commons-cli.version}
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+ provided
+
+
\ No newline at end of file
diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
index febe8b3318b..246bc8ed168 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
@@ -17,14 +17,18 @@
package org.apache.flink.cdc.cli;
+import org.apache.commons.cli.CommandLine;
import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser;
import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
+import org.apache.flink.cdc.cli.utils.ConfigurationUtils;
import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.composer.PipelineComposeExecutor;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.cdc.composer.flink.executors.ComposeExecutorFactory;
import java.nio.file.Path;
import java.util.List;
@@ -38,14 +42,18 @@ public class CliExecutor {
private final boolean useMiniCluster;
private final List additionalJars;
+ private final CommandLine commandLine;
+
private PipelineComposer composer = null;
public CliExecutor(
+ CommandLine commandLine,
Path pipelineDefPath,
Configuration flinkConfig,
Configuration globalPipelineConfig,
boolean useMiniCluster,
List additionalJars) {
+ this.commandLine = commandLine;
this.pipelineDefPath = pipelineDefPath;
this.flinkConfig = flinkConfig;
this.globalPipelineConfig = globalPipelineConfig;
@@ -54,22 +62,31 @@ public CliExecutor(
}
public PipelineExecution.ExecutionInfo run() throws Exception {
- // Parse pipeline definition file
- PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
- PipelineDef pipelineDef =
- pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
-
- // Create composer
- PipelineComposer composer = getComposer(flinkConfig);
-
- // Compose pipeline
- PipelineExecution execution = composer.compose(pipelineDef);
-
- // Execute the pipeline
- return execution.execute();
+ // Create Submit Executor to deployment flink cdc job Or Run Flink CDC Job
+ boolean isDeploymentMode = ConfigurationUtils.isDeploymentMode(commandLine);
+ if (isDeploymentMode) {
+ ComposeExecutorFactory composeExecutorFactory = new ComposeExecutorFactory();
+ PipelineComposeExecutor composeExecutor =
+ composeExecutorFactory.getFlinkComposeExecutor(commandLine);
+ return composeExecutor.deploy(
+ commandLine,
+ org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()),
+ additionalJars);
+ } else {
+ // Run CDC Job And Parse pipeline definition file
+ PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
+ PipelineDef pipelineDef =
+ pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
+ // Create composer
+ PipelineComposer composer = getComposer(flinkConfig);
+ // Compose pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ // Execute or submit the pipeline
+ return execution.execute();
+ }
}
- private PipelineComposer getComposer(Configuration flinkConfig) {
+ private PipelineComposer getComposer(Configuration flinkConfig) throws Exception {
if (composer == null) {
return FlinkEnvironmentUtils.createComposer(
useMiniCluster, flinkConfig, additionalJars);
diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
index cd54333d52a..21eb4cddf70 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
@@ -31,8 +31,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.FileNotFoundException;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
@@ -76,13 +74,13 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {
"Missing pipeline definition file path in arguments. ");
}
- // Take the first unparsed argument as the pipeline definition file
- Path pipelineDefPath = Paths.get(unparsedArgs.get(0));
- if (!Files.exists(pipelineDefPath)) {
- throw new FileNotFoundException(
- String.format("Cannot find pipeline definition file \"%s\"", pipelineDefPath));
+ Path pipelineDefPath = null;
+ // If Running cdc job,need to set pipeline path
+ if (!ConfigurationUtils.isDeploymentMode(commandLine)) {
+ // Take the first unparsed argument as the pipeline definition file
+ pipelineDefPath = Paths.get(unparsedArgs.get(0));
+ LOG.info("Real Path pipelineDefPath {}", pipelineDefPath);
}
-
// Global pipeline configuration
Configuration globalPipelineConfig = getGlobalConfig(commandLine);
@@ -101,6 +99,7 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {
// Build executor
return new CliExecutor(
+ commandLine,
pipelineDefPath,
flinkConfig,
globalPipelineConfig,
diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
index fd3507d52b4..bad9df80d40 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
@@ -46,6 +46,16 @@ public class CliFrontendOptions {
.desc("JARs to be submitted together with the pipeline")
.build();
+ public static final Option TARGET =
+ Option.builder()
+ .longOpt("target")
+ .hasArg()
+ .desc(
+ "The deployment target for the execution. This can take one of the following values "
+ + "local/remote/yarn-session/yarn-application/kubernetes-session/kubernetes"
+ + "-application")
+ .build();
+
public static final Option USE_MINI_CLUSTER =
Option.builder()
.longOpt("use-mini-cluster")
@@ -59,6 +69,7 @@ public static Options initializeOptions() {
.addOption(JAR)
.addOption(FLINK_HOME)
.addOption(GLOBAL_CONFIG)
- .addOption(USE_MINI_CLUSTER);
+ .addOption(USE_MINI_CLUSTER)
+ .addOption(TARGET);
}
}
diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
index efe0a514347..d7232efef6d 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
@@ -23,6 +23,8 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import org.apache.commons.cli.CommandLine;
+
import java.io.FileNotFoundException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -48,4 +50,11 @@ public static Configuration loadMapFormattedConfig(Path configPath) throws Excep
e);
}
}
+
+ public static boolean isDeploymentMode(CommandLine commandLine) {
+ String target = commandLine.getOptionValue("target");
+ return target != null
+ && !target.equalsIgnoreCase("local")
+ && !target.equalsIgnoreCase("remote");
+ }
}
diff --git a/flink-cdc-composer/pom.xml b/flink-cdc-composer/pom.xml
index 51c85fcf89d..2b142204e6e 100644
--- a/flink-cdc-composer/pom.xml
+++ b/flink-cdc-composer/pom.xml
@@ -48,7 +48,7 @@ limitations under the License.
org.apache.flink
flink-clients
${flink.version}
- test
+ provided
org.apache.flink
@@ -56,6 +56,11 @@ limitations under the License.
${flink.version}
test
+
+ org.apache.flink
+ flink-kubernetes
+ ${flink.version}
+
\ No newline at end of file
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineComposeExecutor.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineComposeExecutor.java
new file mode 100644
index 00000000000..9a490f085e8
--- /dev/null
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineComposeExecutor.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer;
+
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.commons.cli.CommandLine;
+
+import java.nio.file.Path;
+import java.util.List;
+
+/** PipelineComposeExecutor to execute flink cdc job from different target */
+public interface PipelineComposeExecutor {
+
+ PipelineExecution.ExecutionInfo deploy(
+ CommandLine commandLine, Configuration flinkConfig, List additionalJars)
+ throws Exception;
+}
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index f8a32283918..2a4906f71b1 100644
--- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -54,7 +54,6 @@
@Internal
public class FlinkPipelineComposer implements PipelineComposer {
- private final StreamExecutionEnvironment env;
private final boolean isBlocking;
public static FlinkPipelineComposer ofRemoteCluster(
@@ -77,21 +76,20 @@ public static FlinkPipelineComposer ofRemoteCluster(
e);
}
});
- return new FlinkPipelineComposer(env, false);
+ return new FlinkPipelineComposer(false);
}
public static FlinkPipelineComposer ofMiniCluster() {
- return new FlinkPipelineComposer(
- StreamExecutionEnvironment.getExecutionEnvironment(), true);
+ return new FlinkPipelineComposer(true);
}
- private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking) {
- this.env = env;
+ public FlinkPipelineComposer(boolean isBlocking) {
this.isBlocking = isBlocking;
}
@Override
public PipelineExecution compose(PipelineDef pipelineDef) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
env.getConfig().setParallelism(parallelism);
@@ -105,7 +103,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
stream = routeTranslator.translate(stream, pipelineDef.getRoute());
// Create sink in advance as schema operator requires MetadataApplier
- DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig());
+ DataSink dataSink = createDataSink(env, pipelineDef.getSink(), pipelineDef.getConfig());
// Schema operator
SchemaOperatorTranslator schemaOperatorTranslator =
@@ -132,13 +130,14 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());
// Add framework JARs
- addFrameworkJars();
+ addFrameworkJars(env);
return new FlinkPipelineExecution(
env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
}
- private DataSink createDataSink(SinkDef sinkDef, Configuration pipelineConfig) {
+ private DataSink createDataSink(
+ StreamExecutionEnvironment env, SinkDef sinkDef, Configuration pipelineConfig) {
// Search the data sink factory
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
@@ -156,7 +155,7 @@ private DataSink createDataSink(SinkDef sinkDef, Configuration pipelineConfig) {
Thread.currentThread().getContextClassLoader()));
}
- private void addFrameworkJars() {
+ private void addFrameworkJars(StreamExecutionEnvironment env) {
try {
Set frameworkJars = new HashSet<>();
// Common JAR
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/executors/ComposeExecutorFactory.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/executors/ComposeExecutorFactory.java
new file mode 100644
index 00000000000..14bfc4bc953
--- /dev/null
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/executors/ComposeExecutorFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.executors;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.flink.cdc.composer.PipelineComposeExecutor;
+
+/** Create deployment methods corresponding to different goals */
+public class ComposeExecutorFactory {
+
+ public PipelineComposeExecutor getFlinkComposeExecutor(CommandLine commandLine)
+ throws Exception {
+ String target = commandLine.getOptionValue("target");
+ if (target.equalsIgnoreCase("kubernetes-application")) {
+ return new K8SApplicationComposeExecutor();
+ }
+ throw new Exception(String.format("target %s is not support", target));
+ }
+}
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/executors/K8SApplicationComposeExecutor.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/executors/K8SApplicationComposeExecutor.java
new file mode 100644
index 00000000000..abc5a7ee7aa
--- /dev/null
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/executors/K8SApplicationComposeExecutor.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cdc.composer.flink.executors;
+
+import org.apache.flink.cdc.composer.PipelineComposeExecutor;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
+import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
+
+import org.apache.commons.cli.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+public class K8SApplicationComposeExecutor implements PipelineComposeExecutor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(K8SApplicationComposeExecutor.class);
+
+ @Override
+ public PipelineExecution.ExecutionInfo deploy(
+ CommandLine commandLine, Configuration flinkConfig, List additionalJars) {
+ LOG.info("Submitting application in 'Flink K8S Application Mode'.");
+ flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
+ List jars =
+ new ArrayList<>();
+ // must be added cdc dist jar by default docker container path
+ jars.add("local:///opt/flink-cdc/lib/flink-cdc-dist-3.0-SNAPSHOT.jar");
+ flinkConfig.set(PipelineOptions.JARS, jars);
+ // set the default cdc latest docker image
+ flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, "flink/flink-cdc:latest");
+ flinkConfig.set(ApplicationConfiguration.APPLICATION_ARGS, commandLine.getArgList());
+ flinkConfig.set(
+ ApplicationConfiguration.APPLICATION_MAIN_CLASS,
+ "com.ververica.cdc.cli.CliFrontend");
+ KubernetesClusterClientFactory kubernetesClusterClientFactory =
+ new KubernetesClusterClientFactory();
+ KubernetesClusterDescriptor descriptor =
+ kubernetesClusterClientFactory.createClusterDescriptor(flinkConfig);
+ ClusterSpecification specification =
+ kubernetesClusterClientFactory.getClusterSpecification(flinkConfig);
+ ApplicationConfiguration applicationConfiguration =
+ ApplicationConfiguration.fromConfiguration(flinkConfig);
+ ClusterClient client = null;
+ try {
+ ClusterClientProvider clusterClientProvider =
+ descriptor.deployApplicationCluster(specification, applicationConfiguration);
+ client = clusterClientProvider.getClusterClient();
+ String clusterId = client.getClusterId();
+ LOG.info("Deployment Flink CDC From Cluster ID {}", clusterId);
+ return new PipelineExecution.ExecutionInfo(clusterId, "submit job successful");
+ } catch (Exception e) {
+ if (client != null) {
+ client.shutDownCluster();
+ }
+ throw new RuntimeException("deploy flink cdc job failed ",e);
+ } finally {
+ descriptor.close();
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+}
diff --git a/mysql-doirs.yaml b/mysql-doirs.yaml
new file mode 100644
index 00000000000..c345937e7f5
--- /dev/null
+++ b/mysql-doirs.yaml
@@ -0,0 +1,24 @@
+################################################################################
+# Description: Sync MySQL all tables to Doris
+################################################################################
+source:
+ type: mysql
+ hostname: localhost
+ port: 3306
+ username: root
+ password: 123456
+ tables: app_db.\.*
+ server-id: 5400-5404
+ server-time-zone: UTC
+
+sink:
+ type: doris
+ fenodes: 127.0.0.1:8030
+ username: root
+ password: ""
+ table.create.properties.light_schema_change: true
+ table.create.properties.replication_num: 1
+
+pipeline:
+ name: Sync MySQL Database to Doris
+ parallelism: 2