Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Apache Flink to 1.19.0 #16430

Merged
merged 2 commits into from
Mar 21, 2024

Conversation

lincoln-lil
Copy link
Contributor

@lincoln-lil lincoln-lil commented Mar 17, 2024

This adds the new Flink 1.19.0 release: https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/

and updates the tags on the 1.18 images accordingly.

@lincoln-lil lincoln-lil requested a review from a team as a code owner March 17, 2024 14:39

This comment has been minimized.

@yosifkit
Copy link
Member

        config_params+=" -D${key}=${value}"

Can the key or value added to config_params have whitespace characters in them? Using eval is often fraught with rough edges, so it makes me wary of code using it. I'm not certain if the eval is trying to allow key/value to reference other variables on purpose or not to know whether it is necessary or if the string could be swapped for an array to keep the items as separate string arguments:

-    local config_params=""
+    local config_params=()
...
-        config_params+=" -D${key}=${value}"
+        config_params+=("-D${key}=${value}")
...
-    if [ ! -z "${config_params}" ]; then
-        eval "${config_parser_script} ${config_dir} ${bin_dir} ${lib_dir} ${config_params}"
+    if [ "${#config_params[@]}" -gt 0 ]; then
+        "${config_parser_script}" "${config_dir}" "${bin_dir}" "${lib_dir}" "${config_params[@]}"

@lincoln-lil
Copy link
Contributor Author

Hi Junrui Lee could you help to confirm yosifkit's comment on these changes (refers to apache/flink-docker#175)?

@JunRuiLee
Copy link

Thank @yosifkit for your insightful comments. I agree that using eval can be risky, especially when dealing with parameters that might include whitespace or special characters. I'll update the script as suggested.

@lincoln-lil lincoln-lil force-pushed the release-flink-1.19.0 branch from 84d7897 to ccf6232 Compare March 19, 2024 05:50
Copy link

Diff for ccf6232:
diff --git a/_bashbrew-cat b/_bashbrew-cat
index 219f9d4..90b95ea 100644
--- a/_bashbrew-cat
+++ b/_bashbrew-cat
@@ -1,26 +1,6 @@
 Maintainers: The Apache Flink Project <[email protected]> (@ApacheFlink)
 GitRepo: https://github.com/apache/flink-docker.git
 
-Tags: 1.15.4-scala_2.12-java8, 1.15-scala_2.12-java8, 1.15.4-java8, 1.15-java8
-Architectures: amd64, arm64v8
-GitCommit: 8eb7ea1a0e668146b2da1dcd08e311e7f7f318f1
-Directory: 1.15/scala_2.12-java8-ubuntu
-
-Tags: 1.15.4-scala_2.12-java11, 1.15-scala_2.12-java11, 1.15.4-scala_2.12, 1.15-scala_2.12, 1.15.4-java11, 1.15-java11, 1.15.4, 1.15
-Architectures: amd64, arm64v8
-GitCommit: 8eb7ea1a0e668146b2da1dcd08e311e7f7f318f1
-Directory: 1.15/scala_2.12-java11-ubuntu
-
-Tags: 1.16.3-scala_2.12-java8, 1.16-scala_2.12-java8, 1.16.3-java8, 1.16-java8
-Architectures: amd64, arm64v8
-GitCommit: 8eb7ea1a0e668146b2da1dcd08e311e7f7f318f1
-Directory: 1.16/scala_2.12-java8-ubuntu
-
-Tags: 1.16.3-scala_2.12-java11, 1.16-scala_2.12-java11, 1.16.3-scala_2.12, 1.16-scala_2.12, 1.16.3-java11, 1.16-java11, 1.16.3, 1.16
-Architectures: amd64, arm64v8
-GitCommit: 8eb7ea1a0e668146b2da1dcd08e311e7f7f318f1
-Directory: 1.16/scala_2.12-java11-ubuntu
-
 Tags: 1.17.2-scala_2.12-java8, 1.17-scala_2.12-java8, 1.17.2-java8, 1.17-java8
 Architectures: amd64, arm64v8
 GitCommit: 8eb7ea1a0e668146b2da1dcd08e311e7f7f318f1
@@ -31,17 +11,32 @@ Architectures: amd64, arm64v8
 GitCommit: 8eb7ea1a0e668146b2da1dcd08e311e7f7f318f1
 Directory: 1.17/scala_2.12-java11-ubuntu
 
-Tags: 1.18.1-scala_2.12-java8, 1.18-scala_2.12-java8, scala_2.12-java8, 1.18.1-java8, 1.18-java8, java8
+Tags: 1.18.1-scala_2.12-java8, 1.18-scala_2.12-java8, 1.18.1-java8, 1.18-java8
 Architectures: amd64, arm64v8
 GitCommit: 8eb7ea1a0e668146b2da1dcd08e311e7f7f318f1
 Directory: 1.18/scala_2.12-java8-ubuntu
 
-Tags: 1.18.1-scala_2.12-java11, 1.18-scala_2.12-java11, scala_2.12-java11, 1.18.1-scala_2.12, 1.18-scala_2.12, scala_2.12, 1.18.1-java11, 1.18-java11, java11, 1.18.1, 1.18, latest
+Tags: 1.18.1-scala_2.12-java11, 1.18-scala_2.12-java11, 1.18.1-scala_2.12, 1.18-scala_2.12, 1.18.1-java11, 1.18-java11, 1.18.1, 1.18
 Architectures: amd64, arm64v8
 GitCommit: 8eb7ea1a0e668146b2da1dcd08e311e7f7f318f1
 Directory: 1.18/scala_2.12-java11-ubuntu
 
-Tags: 1.18.1-scala_2.12-java17, 1.18-scala_2.12-java17, scala_2.12-java17, 1.18.1-java17, 1.18-java17, java17
+Tags: 1.18.1-scala_2.12-java17, 1.18-scala_2.12-java17, 1.18.1-java17, 1.18-java17
 Architectures: amd64, arm64v8
 GitCommit: 8eb7ea1a0e668146b2da1dcd08e311e7f7f318f1
 Directory: 1.18/scala_2.12-java17-ubuntu
+
+Tags: 1.19.0-scala_2.12-java8, 1.19-scala_2.12-java8, scala_2.12-java8, 1.19.0-java8, 1.19-java8, java8
+Architectures: amd64, arm64v8
+GitCommit: 20017e8f0e81d54fe74c0f9f6a3a988ea609be8f
+Directory: 1.19/scala_2.12-java8-ubuntu
+
+Tags: 1.19.0-scala_2.12-java11, 1.19-scala_2.12-java11, scala_2.12-java11, 1.19.0-scala_2.12, 1.19-scala_2.12, scala_2.12, 1.19.0-java11, 1.19-java11, java11, 1.19.0, 1.19, latest
+Architectures: amd64, arm64v8
+GitCommit: 20017e8f0e81d54fe74c0f9f6a3a988ea609be8f
+Directory: 1.19/scala_2.12-java11-ubuntu
+
+Tags: 1.19.0-scala_2.12-java17, 1.19-scala_2.12-java17, scala_2.12-java17, 1.19.0-java17, 1.19-java17, java17
+Architectures: amd64, arm64v8
+GitCommit: 20017e8f0e81d54fe74c0f9f6a3a988ea609be8f
+Directory: 1.19/scala_2.12-java17-ubuntu
diff --git a/_bashbrew-list b/_bashbrew-list
index 35f2ab8..8cfbd58 100644
--- a/_bashbrew-list
+++ b/_bashbrew-list
@@ -1,27 +1,3 @@
-flink:1.15
-flink:1.15-java8
-flink:1.15-java11
-flink:1.15-scala_2.12
-flink:1.15-scala_2.12-java8
-flink:1.15-scala_2.12-java11
-flink:1.15.4
-flink:1.15.4-java8
-flink:1.15.4-java11
-flink:1.15.4-scala_2.12
-flink:1.15.4-scala_2.12-java8
-flink:1.15.4-scala_2.12-java11
-flink:1.16
-flink:1.16-java8
-flink:1.16-java11
-flink:1.16-scala_2.12
-flink:1.16-scala_2.12-java8
-flink:1.16-scala_2.12-java11
-flink:1.16.3
-flink:1.16.3-java8
-flink:1.16.3-java11
-flink:1.16.3-scala_2.12
-flink:1.16.3-scala_2.12-java8
-flink:1.16.3-scala_2.12-java11
 flink:1.17
 flink:1.17-java8
 flink:1.17-java11
@@ -50,6 +26,22 @@ flink:1.18.1-scala_2.12
 flink:1.18.1-scala_2.12-java8
 flink:1.18.1-scala_2.12-java11
 flink:1.18.1-scala_2.12-java17
+flink:1.19
+flink:1.19-java8
+flink:1.19-java11
+flink:1.19-java17
+flink:1.19-scala_2.12
+flink:1.19-scala_2.12-java8
+flink:1.19-scala_2.12-java11
+flink:1.19-scala_2.12-java17
+flink:1.19.0
+flink:1.19.0-java8
+flink:1.19.0-java11
+flink:1.19.0-java17
+flink:1.19.0-scala_2.12
+flink:1.19.0-scala_2.12-java8
+flink:1.19.0-scala_2.12-java11
+flink:1.19.0-scala_2.12-java17
 flink:java8
 flink:java11
 flink:java17
diff --git a/flink_1.15-java8/Dockerfile b/flink_1.15-java8/Dockerfile
deleted file mode 100644
index 50bafa4..0000000
diff --git a/flink_1.15/Dockerfile b/flink_1.15/Dockerfile
deleted file mode 100644
index 2f38d26..0000000
diff --git a/flink_1.16-java8/Dockerfile b/flink_1.16-java8/Dockerfile
deleted file mode 100644
index d7761af..0000000
diff --git a/flink_1.16/Dockerfile b/flink_1.16/Dockerfile
deleted file mode 100644
index 08c808b..0000000
diff --git a/flink_1.16/docker-entrypoint.sh b/flink_1.16/docker-entrypoint.sh
deleted file mode 100755
index 8b0350e..0000000
diff --git a/flink_java17/Dockerfile b/flink_1.18-java17/Dockerfile
similarity index 100%
copy from flink_java17/Dockerfile
copy to flink_1.18-java17/Dockerfile
diff --git a/flink_1.15-java8/docker-entrypoint.sh b/flink_1.18-java17/docker-entrypoint.sh
similarity index 100%
rename from flink_1.15-java8/docker-entrypoint.sh
rename to flink_1.18-java17/docker-entrypoint.sh
diff --git a/flink_java8/Dockerfile b/flink_1.18-java8/Dockerfile
similarity index 100%
copy from flink_java8/Dockerfile
copy to flink_1.18-java8/Dockerfile
diff --git a/flink_1.15/docker-entrypoint.sh b/flink_1.18-java8/docker-entrypoint.sh
similarity index 100%
rename from flink_1.15/docker-entrypoint.sh
rename to flink_1.18-java8/docker-entrypoint.sh
diff --git a/flink_latest/Dockerfile b/flink_1.18/Dockerfile
similarity index 100%
copy from flink_latest/Dockerfile
copy to flink_1.18/Dockerfile
diff --git a/flink_1.16-java8/docker-entrypoint.sh b/flink_1.18/docker-entrypoint.sh
similarity index 100%
rename from flink_1.16-java8/docker-entrypoint.sh
rename to flink_1.18/docker-entrypoint.sh
diff --git a/flink_java17/Dockerfile b/flink_java17/Dockerfile
index 27f23fb..5789596 100644
--- a/flink_java17/Dockerfile
+++ b/flink_java17/Dockerfile
@@ -44,9 +44,9 @@ RUN set -ex; \
   gosu nobody true
 
 # Configure Flink version
-ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download&filename=flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz \
-    FLINK_ASC_URL=https://downloads.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz.asc \
-    GPG_KEY=96AE0E32CBE6E0753CE6DF6CB078D1D3253A8D82 \
+ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download&filename=flink/flink-1.19.0/flink-1.19.0-bin-scala_2.12.tgz \
+    FLINK_ASC_URL=https://downloads.apache.org/flink/flink-1.19.0/flink-1.19.0-bin-scala_2.12.tgz.asc \
+    GPG_KEY=028B6605F51BC296B56A5042E57D30ABEE75CA06 \
     CHECK_GPG=true
 
 # Prepare environment
@@ -81,11 +81,22 @@ RUN set -ex; \
   chown -R flink:flink .; \
   \
   # Replace default REST/RPC endpoint bind address to use the container's network interface \
-  sed -i 's/rest.address: localhost/rest.address: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml; \
-  sed -i 's/rest.bind-address: localhost/rest.bind-address: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml; \
-  sed -i 's/jobmanager.bind-host: localhost/jobmanager.bind-host: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml; \
-  sed -i 's/taskmanager.bind-host: localhost/taskmanager.bind-host: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml; \
-  sed -i '/taskmanager.host: localhost/d' $FLINK_HOME/conf/flink-conf.yaml;
+  CONF_FILE="$FLINK_HOME/conf/flink-conf.yaml"; \
+  if [ ! -e "$FLINK_HOME/conf/flink-conf.yaml" ]; then \
+    CONF_FILE="${FLINK_HOME}/conf/config.yaml"; \
+    /bin/bash "$FLINK_HOME/bin/config-parser-utils.sh" "${FLINK_HOME}/conf" "${FLINK_HOME}/bin" "${FLINK_HOME}/lib" \
+        "-repKV" "rest.address,localhost,0.0.0.0" \
+        "-repKV" "rest.bind-address,localhost,0.0.0.0" \
+        "-repKV" "jobmanager.bind-host,localhost,0.0.0.0" \
+        "-repKV" "taskmanager.bind-host,localhost,0.0.0.0" \
+        "-rmKV" "taskmanager.host=localhost"; \
+  else \
+    sed -i 's/rest.address: localhost/rest.address: 0.0.0.0/g' "$CONF_FILE"; \
+    sed -i 's/rest.bind-address: localhost/rest.bind-address: 0.0.0.0/g' "$CONF_FILE"; \
+    sed -i 's/jobmanager.bind-host: localhost/jobmanager.bind-host: 0.0.0.0/g' "$CONF_FILE"; \
+    sed -i 's/taskmanager.bind-host: localhost/taskmanager.bind-host: 0.0.0.0/g' "$CONF_FILE"; \
+    sed -i '/taskmanager.host: localhost/d' "$CONF_FILE"; \
+  fi;
 
 # Configure container
 COPY docker-entrypoint.sh /
diff --git a/flink_java17/docker-entrypoint.sh b/flink_java17/docker-entrypoint.sh
index 8b0350e..e081109 100755
--- a/flink_java17/docker-entrypoint.sh
+++ b/flink_java17/docker-entrypoint.sh
@@ -23,7 +23,7 @@ COMMAND_HISTORY_SERVER="history-server"
 
 # If unspecified, the hostname of the container is taken as the JobManager address
 JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
-CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
+CONF_FILE_DIR="${FLINK_HOME}/conf"
 
 drop_privs_cmd() {
     if [ $(id -u) != 0 ]; then
@@ -59,34 +59,72 @@ copy_plugins_if_required() {
   done
 }
 
-set_config_option() {
-  local option=$1
-  local value=$2
+set_config_options() {
+    local config_parser_script="$FLINK_HOME/bin/config-parser-utils.sh"
+    local config_dir="$FLINK_HOME/conf"
+    local bin_dir="$FLINK_HOME/bin"
+    local lib_dir="$FLINK_HOME/lib"
 
-  # escape periods for usage in regular expressions
-  local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
+    local config_params=()
 
-  # either override an existing entry, or append a new one
-  if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
-        sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
-  else
-        echo "${option}: ${value}" >> "${CONF_FILE}"
+    while [ $# -gt 0 ]; do
+        local key="$1"
+        local value="$2"
+
+        config_params+=("-D${key}=${value}")
+
+        shift 2
+    done
+
+    if [ "${#config_params[@]}" -gt 0 ]; then
+        "${config_parser_script}" "${config_dir}" "${bin_dir}" "${lib_dir}" "${config_params[@]}"
     fi
 }
 
 prepare_configuration() {
-    set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
-    set_config_option blob.server.port 6124
-    set_config_option query.server.port 6125
+    local config_options=()
+
+    config_options+=("jobmanager.rpc.address" "${JOB_MANAGER_RPC_ADDRESS}")
+    config_options+=("blob.server.port" "6124")
+    config_options+=("query.server.port" "6125")
 
     if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
-        set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
+        config_options+=("taskmanager.numberOfTaskSlots" "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}")
+    fi
+
+    if [ ${#config_options[@]} -ne 0 ]; then
+        set_config_options "${config_options[@]}"
     fi
 
     if [ -n "${FLINK_PROPERTIES}" ]; then
-        echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
+        process_flink_properties "${FLINK_PROPERTIES}"
+    fi
+}
+
+process_flink_properties() {
+    local flink_properties_content=$1
+    local config_options=()
+
+    local OLD_IFS="$IFS"
+    IFS=$'\n'
+    for prop in $flink_properties_content; do
+        prop=$(echo $prop | tr -d '[:space:]')
+
+        if [ -z "$prop" ]; then
+            continue
+        fi
+
+        IFS=':' read -r key value <<< "$prop"
+
+        value=$(echo $value | envsubst)
+
+        config_options+=("$key" "$value")
+    done
+    IFS="$OLD_IFS"
+
+    if [ ${#config_options[@]} -ne 0 ]; then
+        set_config_options "${config_options[@]}"
     fi
-    envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
 }
 
 maybe_enable_jemalloc() {
diff --git a/flink_java8/Dockerfile b/flink_java8/Dockerfile
index ed727e9..aed3c11 100644
--- a/flink_java8/Dockerfile
+++ b/flink_java8/Dockerfile
@@ -44,9 +44,9 @@ RUN set -ex; \
   gosu nobody true
 
 # Configure Flink version
-ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download&filename=flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz \
-    FLINK_ASC_URL=https://downloads.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz.asc \
-    GPG_KEY=96AE0E32CBE6E0753CE6DF6CB078D1D3253A8D82 \
+ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download&filename=flink/flink-1.19.0/flink-1.19.0-bin-scala_2.12.tgz \
+    FLINK_ASC_URL=https://downloads.apache.org/flink/flink-1.19.0/flink-1.19.0-bin-scala_2.12.tgz.asc \
+    GPG_KEY=028B6605F51BC296B56A5042E57D30ABEE75CA06 \
     CHECK_GPG=true
 
 # Prepare environment
@@ -81,11 +81,22 @@ RUN set -ex; \
   chown -R flink:flink .; \
   \
   # Replace default REST/RPC endpoint bind address to use the container's network interface \
-  sed -i 's/rest.address: localhost/rest.address: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml; \
-  sed -i 's/rest.bind-address: localhost/rest.bind-address: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml; \
-  sed -i 's/jobmanager.bind-host: localhost/jobmanager.bind-host: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml; \
-  sed -i 's/taskmanager.bind-host: localhost/taskmanager.bind-host: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml; \
-  sed -i '/taskmanager.host: localhost/d' $FLINK_HOME/conf/flink-conf.yaml;
+  CONF_FILE="$FLINK_HOME/conf/flink-conf.yaml"; \
+  if [ ! -e "$FLINK_HOME/conf/flink-conf.yaml" ]; then \
+    CONF_FILE="${FLINK_HOME}/conf/config.yaml"; \
+    /bin/bash "$FLINK_HOME/bin/config-parser-utils.sh" "${FLINK_HOME}/conf" "${FLINK_HOME}/bin" "${FLINK_HOME}/lib" \
+        "-repKV" "rest.address,localhost,0.0.0.0" \
+        "-repKV" "rest.bind-address,localhost,0.0.0.0" \
+        "-repKV" "jobmanager.bind-host,localhost,0.0.0.0" \
+        "-repKV" "taskmanager.bind-host,localhost,0.0.0.0" \
+        "-rmKV" "taskmanager.host=localhost"; \
+  else \
+    sed -i 's/rest.address: localhost/rest.address: 0.0.0.0/g' "$CONF_FILE"; \
+    sed -i 's/rest.bind-address: localhost/rest.bind-address: 0.0.0.0/g' "$CONF_FILE"; \
+    sed -i 's/jobmanager.bind-host: localhost/jobmanager.bind-host: 0.0.0.0/g' "$CONF_FILE"; \
+    sed -i 's/taskmanager.bind-host: localhost/taskmanager.bind-host: 0.0.0.0/g' "$CONF_FILE"; \
+    sed -i '/taskmanager.host: localhost/d' "$CONF_FILE"; \
+  fi;
 
 # Configure container
 COPY docker-entrypoint.sh /
diff --git a/flink_java8/docker-entrypoint.sh b/flink_java8/docker-entrypoint.sh
index 8b0350e..e081109 100755
--- a/flink_java8/docker-entrypoint.sh
+++ b/flink_java8/docker-entrypoint.sh
@@ -23,7 +23,7 @@ COMMAND_HISTORY_SERVER="history-server"
 
 # If unspecified, the hostname of the container is taken as the JobManager address
 JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
-CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
+CONF_FILE_DIR="${FLINK_HOME}/conf"
 
 drop_privs_cmd() {
     if [ $(id -u) != 0 ]; then
@@ -59,34 +59,72 @@ copy_plugins_if_required() {
   done
 }
 
-set_config_option() {
-  local option=$1
-  local value=$2
+set_config_options() {
+    local config_parser_script="$FLINK_HOME/bin/config-parser-utils.sh"
+    local config_dir="$FLINK_HOME/conf"
+    local bin_dir="$FLINK_HOME/bin"
+    local lib_dir="$FLINK_HOME/lib"
 
-  # escape periods for usage in regular expressions
-  local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
+    local config_params=()
 
-  # either override an existing entry, or append a new one
-  if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
-        sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
-  else
-        echo "${option}: ${value}" >> "${CONF_FILE}"
+    while [ $# -gt 0 ]; do
+        local key="$1"
+        local value="$2"
+
+        config_params+=("-D${key}=${value}")
+
+        shift 2
+    done
+
+    if [ "${#config_params[@]}" -gt 0 ]; then
+        "${config_parser_script}" "${config_dir}" "${bin_dir}" "${lib_dir}" "${config_params[@]}"
     fi
 }
 
 prepare_configuration() {
-    set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
-    set_config_option blob.server.port 6124
-    set_config_option query.server.port 6125
+    local config_options=()
+
+    config_options+=("jobmanager.rpc.address" "${JOB_MANAGER_RPC_ADDRESS}")
+    config_options+=("blob.server.port" "6124")
+    config_options+=("query.server.port" "6125")
 
     if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
-        set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
+        config_options+=("taskmanager.numberOfTaskSlots" "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}")
+    fi
+
+    if [ ${#config_options[@]} -ne 0 ]; then
+        set_config_options "${config_options[@]}"
     fi
 
     if [ -n "${FLINK_PROPERTIES}" ]; then
-        echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
+        process_flink_properties "${FLINK_PROPERTIES}"
+    fi
+}
+
+process_flink_properties() {
+    local flink_properties_content=$1
+    local config_options=()
+
+    local OLD_IFS="$IFS"
+    IFS=$'\n'
+    for prop in $flink_properties_content; do
+        prop=$(echo $prop | tr -d '[:space:]')
+
+        if [ -z "$prop" ]; then
+            continue
+        fi
+
+        IFS=':' read -r key value <<< "$prop"
+
+        value=$(echo $value | envsubst)
+
+        config_options+=("$key" "$value")
+    done
+    IFS="$OLD_IFS"
+
+    if [ ${#config_options[@]} -ne 0 ]; then
+        set_config_options "${config_options[@]}"
     fi
-    envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
 }
 
 maybe_enable_jemalloc() {
diff --git a/flink_latest/Dockerfile b/flink_latest/Dockerfile
index 2dcbd27..b8e946d 100644
--- a/flink_latest/Dockerfile
+++ b/flink_latest/Dockerfile
@@ -44,9 +44,9 @@ RUN set -ex; \
   gosu nobody true
 
 # Configure Flink version
-ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download&filename=flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz \
-    FLINK_ASC_URL=https://downloads.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz.asc \
-    GPG_KEY=96AE0E32CBE6E0753CE6DF6CB078D1D3253A8D82 \
+ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download&filename=flink/flink-1.19.0/flink-1.19.0-bin-scala_2.12.tgz \
+    FLINK_ASC_URL=https://downloads.apache.org/flink/flink-1.19.0/flink-1.19.0-bin-scala_2.12.tgz.asc \
+    GPG_KEY=028B6605F51BC296B56A5042E57D30ABEE75CA06 \
     CHECK_GPG=true
 
 # Prepare environment
@@ -81,11 +81,22 @@ RUN set -ex; \
   chown -R flink:flink .; \
   \
   # Replace default REST/RPC endpoint bind address to use the container's network interface \
-  sed -i 's/rest.address: localhost/rest.address: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml; \
-  sed -i 's/rest.bind-address: localhost/rest.bind-address: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml; \
-  sed -i 's/jobmanager.bind-host: localhost/jobmanager.bind-host: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml; \
-  sed -i 's/taskmanager.bind-host: localhost/taskmanager.bind-host: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml; \
-  sed -i '/taskmanager.host: localhost/d' $FLINK_HOME/conf/flink-conf.yaml;
+  CONF_FILE="$FLINK_HOME/conf/flink-conf.yaml"; \
+  if [ ! -e "$FLINK_HOME/conf/flink-conf.yaml" ]; then \
+    CONF_FILE="${FLINK_HOME}/conf/config.yaml"; \
+    /bin/bash "$FLINK_HOME/bin/config-parser-utils.sh" "${FLINK_HOME}/conf" "${FLINK_HOME}/bin" "${FLINK_HOME}/lib" \
+        "-repKV" "rest.address,localhost,0.0.0.0" \
+        "-repKV" "rest.bind-address,localhost,0.0.0.0" \
+        "-repKV" "jobmanager.bind-host,localhost,0.0.0.0" \
+        "-repKV" "taskmanager.bind-host,localhost,0.0.0.0" \
+        "-rmKV" "taskmanager.host=localhost"; \
+  else \
+    sed -i 's/rest.address: localhost/rest.address: 0.0.0.0/g' "$CONF_FILE"; \
+    sed -i 's/rest.bind-address: localhost/rest.bind-address: 0.0.0.0/g' "$CONF_FILE"; \
+    sed -i 's/jobmanager.bind-host: localhost/jobmanager.bind-host: 0.0.0.0/g' "$CONF_FILE"; \
+    sed -i 's/taskmanager.bind-host: localhost/taskmanager.bind-host: 0.0.0.0/g' "$CONF_FILE"; \
+    sed -i '/taskmanager.host: localhost/d' "$CONF_FILE"; \
+  fi;
 
 # Configure container
 COPY docker-entrypoint.sh /
diff --git a/flink_latest/docker-entrypoint.sh b/flink_latest/docker-entrypoint.sh
index 8b0350e..e081109 100755
--- a/flink_latest/docker-entrypoint.sh
+++ b/flink_latest/docker-entrypoint.sh
@@ -23,7 +23,7 @@ COMMAND_HISTORY_SERVER="history-server"
 
 # If unspecified, the hostname of the container is taken as the JobManager address
 JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
-CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
+CONF_FILE_DIR="${FLINK_HOME}/conf"
 
 drop_privs_cmd() {
     if [ $(id -u) != 0 ]; then
@@ -59,34 +59,72 @@ copy_plugins_if_required() {
   done
 }
 
-set_config_option() {
-  local option=$1
-  local value=$2
+set_config_options() {
+    local config_parser_script="$FLINK_HOME/bin/config-parser-utils.sh"
+    local config_dir="$FLINK_HOME/conf"
+    local bin_dir="$FLINK_HOME/bin"
+    local lib_dir="$FLINK_HOME/lib"
 
-  # escape periods for usage in regular expressions
-  local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
+    local config_params=()
 
-  # either override an existing entry, or append a new one
-  if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
-        sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
-  else
-        echo "${option}: ${value}" >> "${CONF_FILE}"
+    while [ $# -gt 0 ]; do
+        local key="$1"
+        local value="$2"
+
+        config_params+=("-D${key}=${value}")
+
+        shift 2
+    done
+
+    if [ "${#config_params[@]}" -gt 0 ]; then
+        "${config_parser_script}" "${config_dir}" "${bin_dir}" "${lib_dir}" "${config_params[@]}"
     fi
 }
 
 prepare_configuration() {
-    set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
-    set_config_option blob.server.port 6124
-    set_config_option query.server.port 6125
+    local config_options=()
+
+    config_options+=("jobmanager.rpc.address" "${JOB_MANAGER_RPC_ADDRESS}")
+    config_options+=("blob.server.port" "6124")
+    config_options+=("query.server.port" "6125")
 
     if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
-        set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
+        config_options+=("taskmanager.numberOfTaskSlots" "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}")
+    fi
+
+    if [ ${#config_options[@]} -ne 0 ]; then
+        set_config_options "${config_options[@]}"
     fi
 
     if [ -n "${FLINK_PROPERTIES}" ]; then
-        echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
+        process_flink_properties "${FLINK_PROPERTIES}"
+    fi
+}
+
+process_flink_properties() {
+    local flink_properties_content=$1
+    local config_options=()
+
+    local OLD_IFS="$IFS"
+    IFS=$'\n'
+    for prop in $flink_properties_content; do
+        prop=$(echo $prop | tr -d '[:space:]')
+
+        if [ -z "$prop" ]; then
+            continue
+        fi
+
+        IFS=':' read -r key value <<< "$prop"
+
+        value=$(echo $value | envsubst)
+
+        config_options+=("$key" "$value")
+    done
+    IFS="$OLD_IFS"
+
+    if [ ${#config_options[@]} -ne 0 ]; then
+        set_config_options "${config_options[@]}"
     fi
-    envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
 }
 
 maybe_enable_jemalloc() {

Relevant Maintainers:

  • flink: @ApacheFlink

@lincoln-lil
Copy link
Contributor Author

@yosifkit Thank you for reviewing this and thanks @JunRuiLee for the fix!
I've updated the pr include the above changes. Please take a look when you have time : )

@tianon
Copy link
Member

tianon commented Mar 19, 2024

What's the purpose of the if statement in the Dockerfile? Why would some builds have the configuration file already and some not?

@tianon
Copy link
Member

tianon commented Mar 19, 2024

(also #16114 (comment))

@JunRuiLee
Copy link

What's the purpose of the if statement in the Dockerfile? Why would some builds have the configuration file already and some not?

Good question! Let me try to explain that for you.

The if statement in the Dockerfile ensures compatibility with both the new standard YAML configuration file, config.yaml, introduced in FLINK-1.19, and the legacy configuration file, flink-conf.yaml. To guarantee forward compatibility, Flink continues to support the legacy flink-conf.yaml. When the new config.yaml is absent, Flink automatically falls back to retrieve the legacy flink-conf.yaml. As a result, there are two different code paths in the Dockerfile.

@lincoln-lil
Copy link
Contributor Author

@tianon Thanks for your reviewing!

What's the purpose of the if statement in the Dockerfile? Why would some builds have the configuration file already and some not?
As @JunRuiLee explained above, this is a compatibility step for upgrading the standard yaml configuration file in Flink.
After upgrading to version 2.0, the old configuration files will no longer be supported, but they can be handled compatibly in the current 1.19 version.

Regarding the comment you left during the 1.18.1 release(#16114 (comment)), this might be a case of forgetting to follow up 😅.
I have created a Jira https://issues.apache.org/jira/browse/FLINK-34746 to ensure that the next release
switches to the new CDN address, do you think that's okay? (Because considering that Flink 1.19.0 has been
officially announced on 18/03, users are already asking about the ready time of the Docker image)

@yosifkit yosifkit merged commit fe836e1 into docker-library:master Mar 21, 2024
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants