diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 00000000000..cf4bf34d736
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,33 @@
+#/*
+# * Licensed to the Apache Software Foundation (ASF) under one or more
+# * contributor license agreements. See the NOTICE file distributed with
+# * this work for additional information regarding copyright ownership.
+# * The ASF licenses this file to You under the Apache License, Version 2.0
+# * (the "License"); you may not use this file except in compliance with
+# * the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+FROM flink
+
+ARG FLINK_CDC_VERSION=3.0-SNAPSHOT
+
+RUN mkdir -p /opt/flink-cdc
+RUN mkdir -p /opt/flink/usrlib
+ENV FLINK_CDC_HOME /opt/flink-cdc
+COPY flink-cdc-dist/target/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz /tmp/
+RUN tar -xzvf /tmp/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz -C /tmp/ && \
+ mv /tmp/flink-cdc-${FLINK_CDC_VERSION}/* /opt/flink-cdc/ && \
+ rm -rf /tmp/flink-cdc-${FLINK_CDC_VERSION} /tmp/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz
+# copy jars to cdc libs
+COPY flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/target/flink-cdc-pipeline-connector-values-${FLINK_CDC_VERSION}.jar /opt/flink/usrlib/flink-cdc-pipeline-connector-values-${FLINK_CDC_VERSION}.jar
+COPY flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/target/flink-cdc-pipeline-connector-mysql-${FLINK_CDC_VERSION}.jar /opt/flink/usrlib/flink-cdc-pipeline-connector-mysql-${FLINK_CDC_VERSION}.jar
+# copy flink cdc pipeline conf file, Here is an example. Users can replace it according to their needs.
+COPY mysql-doris.yaml $FLINK_CDC_HOME/conf
diff --git a/flink-cdc-cli/pom.xml b/flink-cdc-cli/pom.xml
index 4dae643904b..4e6395337ac 100644
--- a/flink-cdc-cli/pom.xml
+++ b/flink-cdc-cli/pom.xml
@@ -49,6 +49,14 @@ limitations under the License.
commons-cli
${commons-cli.version}
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+ provided
+
+
\ No newline at end of file
diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
index febe8b3318b..06f80c2ed47 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
@@ -19,12 +19,17 @@
import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser;
import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
+import org.apache.flink.cdc.cli.utils.ConfigurationUtils;
import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineComposer;
+import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.cdc.composer.flink.deployment.ComposeDeploymentFactory;
+
+import org.apache.commons.cli.CommandLine;
import java.nio.file.Path;
import java.util.List;
@@ -38,14 +43,18 @@ public class CliExecutor {
private final boolean useMiniCluster;
private final List additionalJars;
+ private final CommandLine commandLine;
+
private PipelineComposer composer = null;
public CliExecutor(
+ CommandLine commandLine,
Path pipelineDefPath,
Configuration flinkConfig,
Configuration globalPipelineConfig,
boolean useMiniCluster,
List additionalJars) {
+ this.commandLine = commandLine;
this.pipelineDefPath = pipelineDefPath;
this.flinkConfig = flinkConfig;
this.globalPipelineConfig = globalPipelineConfig;
@@ -54,22 +63,32 @@ public CliExecutor(
}
public PipelineExecution.ExecutionInfo run() throws Exception {
- // Parse pipeline definition file
- PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
- PipelineDef pipelineDef =
- pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
-
- // Create composer
- PipelineComposer composer = getComposer(flinkConfig);
-
- // Compose pipeline
- PipelineExecution execution = composer.compose(pipelineDef);
-
- // Execute the pipeline
- return execution.execute();
+ // Create Submit Executor to deployment flink cdc job Or Run Flink CDC Job
+ boolean isDeploymentMode = ConfigurationUtils.isDeploymentMode(commandLine);
+ if (isDeploymentMode) {
+ ComposeDeploymentFactory composeDeploymentFactory = new ComposeDeploymentFactory();
+ PipelineDeploymentExecutor composeExecutor =
+ composeDeploymentFactory.getFlinkComposeExecutor(commandLine);
+ return composeExecutor.deploy(
+ commandLine,
+ org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()),
+ additionalJars);
+ } else {
+
+ // Run CDC Job And Parse pipeline definition file
+ PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
+ PipelineDef pipelineDef =
+ pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
+ // Create composer
+ PipelineComposer composer = getComposer(flinkConfig);
+ // Compose pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ // Execute or submit the pipeline
+ return execution.execute();
+ }
}
- private PipelineComposer getComposer(Configuration flinkConfig) {
+ private PipelineComposer getComposer(Configuration flinkConfig) throws Exception {
if (composer == null) {
return FlinkEnvironmentUtils.createComposer(
useMiniCluster, flinkConfig, additionalJars);
diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
index cd54333d52a..17c591b6e0c 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java
@@ -31,8 +31,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.FileNotFoundException;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
@@ -76,13 +74,9 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {
"Missing pipeline definition file path in arguments. ");
}
- // Take the first unparsed argument as the pipeline definition file
Path pipelineDefPath = Paths.get(unparsedArgs.get(0));
- if (!Files.exists(pipelineDefPath)) {
- throw new FileNotFoundException(
- String.format("Cannot find pipeline definition file \"%s\"", pipelineDefPath));
- }
-
+ // Take the first unparsed argument as the pipeline definition file
+ LOG.info("Real Path pipelineDefPath {}", pipelineDefPath);
// Global pipeline configuration
Configuration globalPipelineConfig = getGlobalConfig(commandLine);
@@ -101,6 +95,7 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {
// Build executor
return new CliExecutor(
+ commandLine,
pipelineDefPath,
flinkConfig,
globalPipelineConfig,
diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
index fd3507d52b4..730c809a10c 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
@@ -46,6 +46,16 @@ public class CliFrontendOptions {
.desc("JARs to be submitted together with the pipeline")
.build();
+ public static final Option TARGET =
+ Option.builder("t")
+ .longOpt("target")
+ .hasArg()
+ .desc(
+ "The deployment target for the execution. This can take one of the following values "
+ + "local/remote/yarn-session/yarn-application/kubernetes-session/kubernetes"
+ + "-application")
+ .build();
+
public static final Option USE_MINI_CLUSTER =
Option.builder()
.longOpt("use-mini-cluster")
@@ -59,6 +69,7 @@ public static Options initializeOptions() {
.addOption(JAR)
.addOption(FLINK_HOME)
.addOption(GLOBAL_CONFIG)
- .addOption(USE_MINI_CLUSTER);
+ .addOption(USE_MINI_CLUSTER)
+ .addOption(TARGET);
}
}
diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
index efe0a514347..25b326877e0 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
@@ -18,16 +18,22 @@
package org.apache.flink.cdc.cli.utils;
import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.client.deployment.executors.LocalExecutor;
+import org.apache.flink.client.deployment.executors.RemoteExecutor;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import org.apache.commons.cli.CommandLine;
+
import java.io.FileNotFoundException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
+import static org.apache.flink.cdc.cli.CliFrontendOptions.TARGET;
+
/** Utilities for handling {@link Configuration}. */
public class ConfigurationUtils {
public static Configuration loadMapFormattedConfig(Path configPath) throws Exception {
@@ -48,4 +54,11 @@ public static Configuration loadMapFormattedConfig(Path configPath) throws Excep
e);
}
}
+
+ public static boolean isDeploymentMode(CommandLine commandLine) {
+ String target = commandLine.getOptionValue(TARGET);
+ return target != null
+ && !target.equalsIgnoreCase(LocalExecutor.NAME)
+ && !target.equalsIgnoreCase(RemoteExecutor.NAME);
+ }
}
diff --git a/flink-cdc-composer/pom.xml b/flink-cdc-composer/pom.xml
index d17b7085f9f..1ce6e8b659a 100644
--- a/flink-cdc-composer/pom.xml
+++ b/flink-cdc-composer/pom.xml
@@ -48,7 +48,7 @@ limitations under the License.
org.apache.flink
flink-clients
${flink.version}
- test
+ provided
org.apache.flink
@@ -56,6 +56,11 @@ limitations under the License.
${flink.version}
test
+
+ org.apache.flink
+ flink-kubernetes
+ ${flink.version}
+
\ No newline at end of file
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java
new file mode 100644
index 00000000000..37d573e6bfd
--- /dev/null
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineDeploymentExecutor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.commons.cli.CommandLine;
+
+import java.nio.file.Path;
+import java.util.List;
+
+/** PipelineDeploymentExecutor to execute flink cdc job from different target. */
+public interface PipelineDeploymentExecutor {
+
+ PipelineExecution.ExecutionInfo deploy(
+ CommandLine commandLine, Configuration flinkConfig, List additionalJars)
+ throws Exception;
+}
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index f8a32283918..2a4906f71b1 100644
--- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -54,7 +54,6 @@
@Internal
public class FlinkPipelineComposer implements PipelineComposer {
- private final StreamExecutionEnvironment env;
private final boolean isBlocking;
public static FlinkPipelineComposer ofRemoteCluster(
@@ -77,21 +76,20 @@ public static FlinkPipelineComposer ofRemoteCluster(
e);
}
});
- return new FlinkPipelineComposer(env, false);
+ return new FlinkPipelineComposer(false);
}
public static FlinkPipelineComposer ofMiniCluster() {
- return new FlinkPipelineComposer(
- StreamExecutionEnvironment.getExecutionEnvironment(), true);
+ return new FlinkPipelineComposer(true);
}
- private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking) {
- this.env = env;
+ public FlinkPipelineComposer(boolean isBlocking) {
this.isBlocking = isBlocking;
}
@Override
public PipelineExecution compose(PipelineDef pipelineDef) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
env.getConfig().setParallelism(parallelism);
@@ -105,7 +103,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
stream = routeTranslator.translate(stream, pipelineDef.getRoute());
// Create sink in advance as schema operator requires MetadataApplier
- DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig());
+ DataSink dataSink = createDataSink(env, pipelineDef.getSink(), pipelineDef.getConfig());
// Schema operator
SchemaOperatorTranslator schemaOperatorTranslator =
@@ -132,13 +130,14 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());
// Add framework JARs
- addFrameworkJars();
+ addFrameworkJars(env);
return new FlinkPipelineExecution(
env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
}
- private DataSink createDataSink(SinkDef sinkDef, Configuration pipelineConfig) {
+ private DataSink createDataSink(
+ StreamExecutionEnvironment env, SinkDef sinkDef, Configuration pipelineConfig) {
// Search the data sink factory
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
@@ -156,7 +155,7 @@ private DataSink createDataSink(SinkDef sinkDef, Configuration pipelineConfig) {
Thread.currentThread().getContextClassLoader()));
}
- private void addFrameworkJars() {
+ private void addFrameworkJars(StreamExecutionEnvironment env) {
try {
Set frameworkJars = new HashSet<>();
// Common JAR
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java
new file mode 100644
index 00000000000..675c2797421
--- /dev/null
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.deployment;
+
+import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
+
+import org.apache.commons.cli.CommandLine;
+
+/** Create deployment methods corresponding to different goals. */
+public class ComposeDeploymentFactory {
+
+ public PipelineDeploymentExecutor getFlinkComposeExecutor(CommandLine commandLine)
+ throws Exception {
+ String target = commandLine.getOptionValue("target");
+ if (target.equalsIgnoreCase("kubernetes-application")) {
+ return new K8SApplicationDeploymentExecutor();
+ }
+ throw new Exception(String.format("target %s is not support", target));
+ }
+}
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java
new file mode 100644
index 00000000000..301cdd41a5e
--- /dev/null
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.deployment;
+
+import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
+import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
+
+import org.apache.commons.cli.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/** deploy flink cdc job by native k8s application mode. */
+public class K8SApplicationDeploymentExecutor implements PipelineDeploymentExecutor {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(K8SApplicationDeploymentExecutor.class);
+
+ @Override
+ public PipelineExecution.ExecutionInfo deploy(
+ CommandLine commandLine, Configuration flinkConfig, List additionalJars) {
+ LOG.info("Submitting application in 'Flink K8S Application Mode'.");
+ flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
+ List jars = new ArrayList<>();
+ // must be added cdc dist jar by default docker container path
+ jars.add("local:///opt/flink-cdc/lib/flink-cdc-dist-3.0-SNAPSHOT.jar");
+ flinkConfig.set(PipelineOptions.JARS, jars);
+ // set the default cdc latest docker image
+ flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, "flink/flink-cdc:latest");
+ flinkConfig.set(ApplicationConfiguration.APPLICATION_ARGS, commandLine.getArgList());
+ flinkConfig.set(
+ ApplicationConfiguration.APPLICATION_MAIN_CLASS,
+ "org.apache.flink.cdc.cli.CliFrontend");
+ KubernetesClusterClientFactory kubernetesClusterClientFactory =
+ new KubernetesClusterClientFactory();
+ KubernetesClusterDescriptor descriptor =
+ kubernetesClusterClientFactory.createClusterDescriptor(flinkConfig);
+ ClusterSpecification specification =
+ kubernetesClusterClientFactory.getClusterSpecification(flinkConfig);
+ ApplicationConfiguration applicationConfiguration =
+ ApplicationConfiguration.fromConfiguration(flinkConfig);
+ ClusterClient client = null;
+ try {
+ ClusterClientProvider clusterClientProvider =
+ descriptor.deployApplicationCluster(specification, applicationConfiguration);
+ client = clusterClientProvider.getClusterClient();
+ String clusterId = client.getClusterId();
+ LOG.info("Deployment Flink CDC From Cluster ID {}", clusterId);
+ return new PipelineExecution.ExecutionInfo(clusterId, "submit job successful");
+ } catch (Exception e) {
+ if (client != null) {
+ client.shutDownCluster();
+ }
+ throw new RuntimeException("deploy flink cdc job failed ", e);
+ } finally {
+ descriptor.close();
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+}
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
index 2eb79aeadce..4f7649d9e4d 100644
--- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
@@ -93,6 +93,12 @@ public static Optional getJarPathByIdentifier(
try {
T factory = getFactoryByIdentifier(identifier, factoryClass);
URL url = factory.getClass().getProtectionDomain().getCodeSource().getLocation();
+ String urlString = url.toString();
+ if (urlString.contains("usrlib")) {
+ String flinkHome = System.getenv("FLINK_HOME");
+ urlString = urlString.replace("usrlib", flinkHome + "/usrlib");
+ }
+ url = new URL(urlString);
if (Files.isDirectory(Paths.get(url.toURI()))) {
LOG.warn(
"The factory class \"{}\" is contained by directory \"{}\" instead of JAR. "
@@ -104,7 +110,8 @@ public static Optional getJarPathByIdentifier(
return Optional.of(url);
} catch (Exception e) {
throw new RuntimeException(
- String.format("Failed to search JAR by factory identifier \"%s\"", identifier));
+ String.format("Failed to search JAR by factory identifier \"%s\"", identifier),
+ e);
}
}
}
diff --git a/mysql-doris.yaml b/mysql-doris.yaml
new file mode 100644
index 00000000000..f31c50b54fb
--- /dev/null
+++ b/mysql-doris.yaml
@@ -0,0 +1,41 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+################################################################################
+# Description: Sync MySQL all tables to Doris
+################################################################################
+source:
+ type: mysql
+ hostname: localhost
+ port: 3306
+ username: root
+ password: 123456
+ tables: app_db.\.*
+ server-id: 5400-5404
+ server-time-zone: UTC
+
+sink:
+ type: doris
+ fenodes: 127.0.0.1:8030
+ username: root
+ password: ""
+ table.create.properties.light_schema_change: true
+ table.create.properties.replication_num: 1
+
+pipeline:
+ name: Sync MySQL Database to Doris
+ parallelism: 2