Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35360] support Flink cdc pipeline Yarn application mode #3643

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 75 additions & 36 deletions flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,84 +20,123 @@
import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser;
import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
import org.apache.flink.cdc.cli.utils.ConfigurationUtils;
import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.flink.deployment.ComposeDeploymentFactory;
import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
import org.apache.flink.cdc.composer.flink.deployment.K8SApplicationDeploymentExecutor;
import org.apache.flink.cdc.composer.flink.deployment.YarnApplicationDeploymentExecutor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.commons.cli.CommandLine;

import java.nio.file.Path;
import java.util.List;

import static org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment.KUBERNETES_APPLICATION;
import static org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment.LOCAL;
import static org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment.REMOTE;
import static org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment.YARN_APPLICATION;
import static org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment.YARN_SESSION;

/** Executor for doing the composing and submitting logic for {@link CliFrontend}. */
public class CliExecutor {

private final Path pipelineDefPath;
private final Configuration flinkConfig;
private final Configuration globalPipelineConfig;
private final boolean useMiniCluster;
private final List<Path> additionalJars;

private final Path flinkHome;
private final CommandLine commandLine;

private PipelineComposer composer = null;

private final SavepointRestoreSettings savepointSettings;

public CliExecutor(
CommandLine commandLine,
Path pipelineDefPath,
Configuration flinkConfig,
Configuration globalPipelineConfig,
boolean useMiniCluster,
List<Path> additionalJars,
SavepointRestoreSettings savepointSettings) {
SavepointRestoreSettings savepointSettings,
Path flinkHome) {
this.commandLine = commandLine;
this.pipelineDefPath = pipelineDefPath;
this.flinkConfig = flinkConfig;
this.globalPipelineConfig = globalPipelineConfig;
this.useMiniCluster = useMiniCluster;
this.additionalJars = additionalJars;
this.savepointSettings = savepointSettings;
this.flinkHome = flinkHome;
}

public PipelineExecution.ExecutionInfo run() throws Exception {
// Create Submit Executor to deployment flink cdc job Or Run Flink CDC Job
boolean isDeploymentMode = ConfigurationUtils.isDeploymentMode(commandLine);
if (isDeploymentMode) {
ComposeDeploymentFactory composeDeploymentFactory = new ComposeDeploymentFactory();
PipelineDeploymentExecutor composeExecutor =
composeDeploymentFactory.getFlinkComposeExecutor(commandLine);
return composeExecutor.deploy(
commandLine,
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()),
additionalJars);
} else {
// Run CDC Job And Parse pipeline definition file
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
// Create composer
PipelineComposer composer = getComposer();
// Compose pipeline
PipelineExecution execution = composer.compose(pipelineDef);
// Execute or submit the pipeline
return execution.execute();
String deploymentTarget = ConfigurationUtils.getDeploymentMode(commandLine);
switch (deploymentTarget) {
case KUBERNETES_APPLICATION:
return deployWithApplicationComposer(new K8SApplicationDeploymentExecutor());
case YARN_APPLICATION:
return deployWithApplicationComposer(new YarnApplicationDeploymentExecutor());
case LOCAL:
return deployWithLocalExecutor();
case REMOTE:
case YARN_SESSION:
return deployWithRemoteExecutor();
default:
throw new IllegalArgumentException(
String.format("Deployment target %s is not supported", deploymentTarget));
}
}

private PipelineComposer getComposer() throws Exception {
if (composer == null) {
return FlinkEnvironmentUtils.createComposer(
useMiniCluster, flinkConfig, additionalJars, savepointSettings);
}
return composer;
private PipelineExecution.ExecutionInfo deployWithApplicationComposer(
PipelineDeploymentExecutor composeExecutor) throws Exception {
return composeExecutor.deploy(
commandLine,
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()),
additionalJars,
flinkHome);
}

private PipelineExecution.ExecutionInfo deployWithLocalExecutor() throws Exception {
return executePipeline(FlinkPipelineComposer.ofMiniCluster());
}

private PipelineExecution.ExecutionInfo deployWithRemoteExecutor() throws Exception {
org.apache.flink.configuration.Configuration configuration =
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap());
SavepointRestoreSettings.toConfiguration(savepointSettings, configuration);
return executePipeline(
FlinkPipelineComposer.ofRemoteCluster(configuration, additionalJars));
}

private PipelineExecution.ExecutionInfo executePipeline(PipelineComposer composer)
throws Exception {
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
PipelineExecution execution = composer.compose(pipelineDef);
return execution.execute();
}

@VisibleForTesting
public PipelineExecution.ExecutionInfo deployWithNoOpComposer() throws Exception {
return executePipeline(this.composer);
}

// The main class for running application mode
public static void main(String[] args) throws Exception {
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
org.apache.flink.core.fs.Path pipelineDefPath = new org.apache.flink.core.fs.Path(args[0]);
PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineDefPath, new Configuration());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkPipelineComposer flinkPipelineComposer =
FlinkPipelineComposer.ofApplicationCluster(env);
PipelineExecution execution = flinkPipelineComposer.compose(pipelineDef);
execution.execute();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
Expand All @@ -34,8 +35,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -81,7 +80,7 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {
"Missing pipeline definition file path in arguments. ");
}

Path pipelineDefPath = Paths.get(unparsedArgs.get(0));
Path pipelineDefPath = new Path(unparsedArgs.get(0));
// Take the first unparsed argument as the pipeline definition file
LOG.info("Real Path pipelineDefPath {}", pipelineDefPath);
// Global pipeline configuration
Expand All @@ -100,7 +99,7 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {
Optional.ofNullable(
commandLine.getOptionValues(CliFrontendOptions.JAR))
.orElse(new String[0]))
.map(Paths::get)
.map(Path::new)
.collect(Collectors.toList());

// Build executor
Expand All @@ -109,9 +108,9 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {
pipelineDefPath,
flinkConfig,
globalPipelineConfig,
commandLine.hasOption(CliFrontendOptions.USE_MINI_CLUSTER),
additionalJars,
savepointSettings);
savepointSettings,
flinkHome);
}

private static SavepointRestoreSettings createSavepointRestoreSettings(
Expand Down Expand Up @@ -142,14 +141,14 @@ private static Path getFlinkHome(CommandLine commandLine) {
String flinkHomeFromArgs = commandLine.getOptionValue(CliFrontendOptions.FLINK_HOME);
if (flinkHomeFromArgs != null) {
LOG.debug("Flink home is loaded by command-line argument: {}", flinkHomeFromArgs);
return Paths.get(flinkHomeFromArgs);
return new Path(flinkHomeFromArgs);
}

// Fallback to environment variable
String flinkHomeFromEnvVar = System.getenv(FLINK_HOME_ENV_VAR);
if (flinkHomeFromEnvVar != null) {
LOG.debug("Flink home is loaded by environment variable: {}", flinkHomeFromEnvVar);
return Paths.get(flinkHomeFromEnvVar);
return new Path(flinkHomeFromEnvVar);
}

throw new IllegalArgumentException(
Expand All @@ -162,16 +161,15 @@ private static Configuration getGlobalConfig(CommandLine commandLine) throws Exc
// Try to get global config path from command line
String globalConfig = commandLine.getOptionValue(CliFrontendOptions.GLOBAL_CONFIG);
if (globalConfig != null) {
Path globalConfigPath = Paths.get(globalConfig);
Path globalConfigPath = new Path(globalConfig);
LOG.info("Using global config in command line: {}", globalConfigPath);
return ConfigurationUtils.loadConfigFile(globalConfigPath);
}

// Fallback to Flink CDC home
String flinkCdcHome = System.getenv(FLINK_CDC_HOME_ENV_VAR);
if (flinkCdcHome != null) {
Path globalConfigPath =
Paths.get(flinkCdcHome).resolve("conf").resolve("flink-cdc.yaml");
Path globalConfigPath = new Path(flinkCdcHome, "/conf" + "/flink-cdc.yaml");
LOG.info("Using global config in FLINK_CDC_HOME: {}", globalConfigPath);
return ConfigurationUtils.loadConfigFile(globalConfigPath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.definition.PipelineDef;

import java.nio.file.Path;
import org.apache.flink.core.fs.Path;

/** Parsing pipeline definition files and generate {@link PipelineDef}. */
public interface PipelineDefinitionParser {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@
import org.apache.flink.cdc.composer.definition.SourceDef;
import org.apache.flink.cdc.composer.definition.TransformDef;
import org.apache.flink.cdc.composer.definition.UdfDef;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -93,7 +95,9 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
@Override
public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfig)
throws Exception {
return parse(mapper.readTree(pipelineDefPath.toFile()), globalPipelineConfig);
FileSystem fileSystem = FileSystem.get(pipelineDefPath.toUri());
FSDataInputStream inStream = fileSystem.open(pipelineDefPath);
return parse(mapper.readTree(inStream), globalPipelineConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package org.apache.flink.cdc.cli.utils;

import org.apache.flink.cdc.cli.CliFrontendOptions;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.client.deployment.executors.LocalExecutor;
import org.apache.flink.client.deployment.executors.RemoteExecutor;
import org.apache.flink.core.fs.Path;

import org.apache.commons.cli.CommandLine;

import java.nio.file.Path;
import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -42,7 +43,8 @@ public static Configuration loadConfigFile(Path configPath) throws Exception {
public static Configuration loadConfigFile(Path configPath, boolean allowDuplicateKeys)
throws Exception {
Map<String, Object> configMap =
YamlParserUtils.loadYamlFile(configPath.toFile(), allowDuplicateKeys);
YamlParserUtils.loadYamlFile(
new File(configPath.toUri().getPath()), allowDuplicateKeys);
return Configuration.fromMap(flattenConfigMap(configMap, ""));
}

Expand All @@ -69,10 +71,10 @@ private static Map<String, String> flattenConfigMap(
return flattenedMap;
}

public static boolean isDeploymentMode(CommandLine commandLine) {
String target = commandLine.getOptionValue(TARGET);
return target != null
&& !target.equalsIgnoreCase(LocalExecutor.NAME)
&& !target.equalsIgnoreCase(RemoteExecutor.NAME);
public static String getDeploymentMode(CommandLine commandLine) {
if (commandLine.hasOption(CliFrontendOptions.USE_MINI_CLUSTER)) {
return LocalExecutor.NAME;
}
return commandLine.getOptionValue(TARGET, LocalExecutor.NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@
package org.apache.flink.cdc.cli.utils;

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.core.fs.Path;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Path;
import java.util.List;

/** Utilities for handling Flink configuration and environment. */
public class FlinkEnvironmentUtils {

Expand All @@ -36,26 +32,13 @@ public class FlinkEnvironmentUtils {
private static final String FLINK_CONF_FILENAME = "config.yaml";

public static Configuration loadFlinkConfiguration(Path flinkHome) throws Exception {
Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
if (flinkConfPath.toFile().exists()) {
Path flinkConfPath = new Path(flinkHome, "/" + FLINK_CONF_DIR + "/" + FLINK_CONF_FILENAME);
if (flinkConfPath.getFileSystem().exists(flinkConfPath)) {
return ConfigurationUtils.loadConfigFile(flinkConfPath);
} else {
return ConfigurationUtils.loadConfigFile(
flinkHome.resolve(FLINK_CONF_DIR).resolve(LEGACY_FLINK_CONF_FILENAME), true);
}
}

public static FlinkPipelineComposer createComposer(
boolean useMiniCluster,
Configuration flinkConfig,
List<Path> additionalJars,
SavepointRestoreSettings savepointSettings) {
if (useMiniCluster) {
return FlinkPipelineComposer.ofMiniCluster();
new Path(flinkHome, "/" + FLINK_CONF_DIR + "/" + LEGACY_FLINK_CONF_FILENAME),
true);
}
org.apache.flink.configuration.Configuration configuration =
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap());
SavepointRestoreSettings.toConfiguration(savepointSettings, configuration);
return FlinkPipelineComposer.ofRemoteCluster(configuration, additionalJars);
}
}
Loading
Loading