entry : map.entrySet()) {
+ properties.setProperty(entry.getKey(), entry.getValue());
+ }
+ return new Configuration(properties);
+ }
+
+ public Properties toProperties() {
+ Properties clonedConfiguration = new Properties();
+ clonedConfiguration.putAll(configuration);
+ return clonedConfiguration;
+ }
+
+ /** Loads configuration from the configuration file. */
+ /**
+ * Loads a YAML-file of key-value pairs.
+ *
+ * Colon and whitespace ": " separate key and value (one per line). The hash tag "#" starts a
+ * single-line comment.
+ *
+ *
Example:
+ *
+ *
+ * remote-shuffle.manager.rpc-address: localhost # network address for communication with the shuffle manager
+ * remote-shuffle.manager.rpc-port : 23123 # network port to connect to for communication with the shuffle manager
+ *
+ *
+ * This does not span the whole YAML specification, but only the *syntax* of simple YAML
+ * key-value pairs (see issue #113 on GitHub). If at any point in time, there is a need to go
+ * beyond simple key-value pairs syntax compatibility will allow to introduce a YAML parser
+ * library.
+ *
+ * @param confDir the conf dir.
+ * @see YAML 1.2 specification
+ */
+ /** Loads configuration from the configuration file. */
+ private static Properties loadConfiguration(String confDir) throws IOException {
+ File confFile = new File(confDir, REMOTE_SHUFFLE_CONF_FILENAME);
+ Properties configuration = new Properties();
+
+ if (!confFile.exists()) {
+ LOG.warn(
+ "Configuration file {} does not exist, only dynamic parameters will be used.",
+ confFile.getAbsolutePath());
+ return configuration;
+ }
+
+ if (!confFile.isFile()) {
+ throw new ConfigurationException(
+ String.format(
+ "Configuration file %s is not a normal file.",
+ confFile.getAbsoluteFile()));
+ }
+
+ LOG.info("Loading configurations from config file: {}", confFile);
+ try (BufferedReader reader =
+ new BufferedReader(new InputStreamReader(new FileInputStream(confFile)))) {
+
+ String line;
+ int lineNo = 0;
+ while ((line = reader.readLine()) != null) {
+ lineNo++;
+ // 1. check for comments
+ String[] comments = line.split("#", 2);
+ String conf = comments[0].trim();
+
+ // 2. get key and value
+ if (conf.length() > 0) {
+ String[] kv = conf.split(": ", 2);
+
+ // skip line with no valid key-value pair
+ if (kv.length == 1) {
+ LOG.warn(
+ "Error while trying to split key and value in configuration file "
+ + confFile
+ + ":"
+ + lineNo
+ + ": \""
+ + line
+ + "\"");
+ continue;
+ }
+
+ String key = kv[0].trim();
+ String value = kv[1].trim();
+
+ // sanity check
+ if (key.length() == 0 || value.length() == 0) {
+ LOG.warn(
+ "Error after splitting key and value in configuration file "
+ + confFile
+ + ":"
+ + lineNo
+ + ": \""
+ + line
+ + "\"");
+ continue;
+ }
+
+ LOG.info("Loading configuration property: {}, {}", key, value);
+ configuration.setProperty(key, value);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error parsing YAML configuration.", e);
+ }
+ return configuration;
+ }
+
+ private static String convertToString(Object o) {
+ if (o.getClass() == String.class) {
+ return (String) o;
+ } else if (o.getClass() == Duration.class) {
+ Duration duration = (Duration) o;
+ return String.format("%d ns", duration.toNanos());
+ } else if (o instanceof List) {
+ return ((List>) o)
+ .stream()
+ .map(e -> escapeWithSingleQuote(convertToString(e), ";"))
+ .collect(Collectors.joining(";"));
+ } else if (o instanceof Map) {
+ return ((Map, ?>) o)
+ .entrySet().stream()
+ .map(
+ e -> {
+ String escapedKey =
+ escapeWithSingleQuote(e.getKey().toString(), ":");
+ String escapedValue =
+ escapeWithSingleQuote(e.getValue().toString(), ":");
+
+ return escapeWithSingleQuote(
+ escapedKey + ":" + escapedValue, ",");
+ })
+ .collect(Collectors.joining(","));
+ }
+
+ return o.toString();
+ }
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/config/MemorySize.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/config/MemorySize.java
new file mode 100644
index 00000000..da92101f
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/config/MemorySize.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.config;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import static com.alibaba.flink.shuffle.common.config.MemorySize.MemoryUnit.BYTES;
+import static com.alibaba.flink.shuffle.common.config.MemorySize.MemoryUnit.GIGA_BYTES;
+import static com.alibaba.flink.shuffle.common.config.MemorySize.MemoryUnit.KILO_BYTES;
+import static com.alibaba.flink.shuffle.common.config.MemorySize.MemoryUnit.MEGA_BYTES;
+import static com.alibaba.flink.shuffle.common.config.MemorySize.MemoryUnit.TERA_BYTES;
+import static com.alibaba.flink.shuffle.common.config.MemorySize.MemoryUnit.hasUnit;
+import static com.alibaba.flink.shuffle.common.utils.CommonUtils.checkArgument;
+import static com.alibaba.flink.shuffle.common.utils.CommonUtils.checkNotNull;
+
+/**
+ * MemorySize is a representation of a number of bytes, viewable in different units.
+ *
+ *
Parsing
+ *
+ * The size can be parsed from a text expression. If the expression is a pure number, the value
+ * will be interpreted as bytes.
+ *
+ *
This class is copied from Apache Flink (org.apache.flink.configuration.MemorySize)
+ */
+public class MemorySize implements java.io.Serializable, Comparable {
+
+ private static final long serialVersionUID = 450443291938254568L;
+
+ public static final MemorySize ZERO = new MemorySize(0L);
+
+ public static final MemorySize MAX_VALUE = new MemorySize(Long.MAX_VALUE);
+
+ private static final List ORDERED_UNITS =
+ Arrays.asList(BYTES, KILO_BYTES, MEGA_BYTES, GIGA_BYTES, TERA_BYTES);
+
+ // ------------------------------------------------------------------------
+
+ /** The memory size, in bytes. */
+ private final long bytes;
+
+ /** The memorized value returned by toString(). */
+ private transient String stringified;
+
+ /** The memorized value returned by toHumanReadableString(). */
+ private transient String humanReadableStr;
+
+ /**
+ * Constructs a new MemorySize.
+ *
+ * @param bytes The size, in bytes. Must be zero or larger.
+ */
+ public MemorySize(long bytes) {
+ checkArgument(bytes >= 0, "bytes must be >= 0");
+ this.bytes = bytes;
+ }
+
+ public static MemorySize ofMebiBytes(long mebiBytes) {
+ return new MemorySize(mebiBytes << 20);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** Gets the memory size in bytes. */
+ public long getBytes() {
+ return bytes;
+ }
+
+ /** Gets the memory size in Kibibytes (= 1024 bytes). */
+ public long getKibiBytes() {
+ return bytes >> 10;
+ }
+
+ /** Gets the memory size in Mebibytes (= 1024 Kibibytes). */
+ public int getMebiBytes() {
+ return (int) (bytes >> 20);
+ }
+
+ /** Gets the memory size in Gibibytes (= 1024 Mebibytes). */
+ public long getGibiBytes() {
+ return bytes >> 30;
+ }
+
+ /** Gets the memory size in Tebibytes (= 1024 Gibibytes). */
+ public long getTebiBytes() {
+ return bytes >> 40;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return (int) (bytes ^ (bytes >>> 32));
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this
+ || (obj != null
+ && obj.getClass() == this.getClass()
+ && ((MemorySize) obj).bytes == this.bytes);
+ }
+
+ @Override
+ public String toString() {
+ if (stringified == null) {
+ stringified = formatToString();
+ }
+
+ return stringified;
+ }
+
+ private String formatToString() {
+ MemoryUnit highestIntegerUnit =
+ IntStream.range(0, ORDERED_UNITS.size())
+ .sequential()
+ .filter(idx -> bytes % ORDERED_UNITS.get(idx).getMultiplier() != 0)
+ .boxed()
+ .findFirst()
+ .map(
+ idx -> {
+ if (idx == 0) {
+ return ORDERED_UNITS.get(0);
+ } else {
+ return ORDERED_UNITS.get(idx - 1);
+ }
+ })
+ .orElse(BYTES);
+
+ return String.format(
+ "%d %s",
+ bytes / highestIntegerUnit.getMultiplier(), highestIntegerUnit.getUnits()[1]);
+ }
+
+ public String toHumanReadableString() {
+ if (humanReadableStr == null) {
+ humanReadableStr = formatToHumanReadableString();
+ }
+
+ return humanReadableStr;
+ }
+
+ private String formatToHumanReadableString() {
+ MemoryUnit highestUnit =
+ IntStream.range(0, ORDERED_UNITS.size())
+ .sequential()
+ .filter(idx -> bytes > ORDERED_UNITS.get(idx).getMultiplier())
+ .boxed()
+ .max(Comparator.naturalOrder())
+ .map(ORDERED_UNITS::get)
+ .orElse(BYTES);
+
+ if (highestUnit == BYTES) {
+ return String.format("%d %s", bytes, BYTES.getUnits()[1]);
+ } else {
+ double approximate = 1.0 * bytes / highestUnit.getMultiplier();
+ return String.format(
+ Locale.ROOT,
+ "%.3f%s (%d bytes)",
+ approximate,
+ highestUnit.getUnits()[1],
+ bytes);
+ }
+ }
+
+ @Override
+ public int compareTo(MemorySize that) {
+ return Long.compare(this.bytes, that.bytes);
+ }
+
+ // ------------------------------------------------------------------------
+ // Calculations
+ // ------------------------------------------------------------------------
+
+ public MemorySize add(MemorySize that) {
+ return new MemorySize(Math.addExact(this.bytes, that.bytes));
+ }
+
+ public MemorySize subtract(MemorySize that) {
+ return new MemorySize(Math.subtractExact(this.bytes, that.bytes));
+ }
+
+ public MemorySize multiply(double multiplier) {
+ checkArgument(multiplier >= 0, "multiplier must be >= 0");
+
+ BigDecimal product =
+ BigDecimal.valueOf(this.bytes).multiply(BigDecimal.valueOf(multiplier));
+ if (product.compareTo(BigDecimal.valueOf(Long.MAX_VALUE)) > 0) {
+ throw new ArithmeticException("long overflow");
+ }
+ return new MemorySize(product.longValue());
+ }
+
+ public MemorySize divide(long by) {
+ checkArgument(by >= 0, "divisor must be >= 0");
+ return new MemorySize(bytes / by);
+ }
+
+ // ------------------------------------------------------------------------
+ // Parsing
+ // ------------------------------------------------------------------------
+
+ /**
+ * Parses the given string as as MemorySize.
+ *
+ * @param text The string to parse
+ * @return The parsed MemorySize
+ * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
+ */
+ public static MemorySize parse(String text) throws IllegalArgumentException {
+ return new MemorySize(parseBytes(text));
+ }
+
+ /**
+ * Parses the given string with a default unit.
+ *
+ * @param text The string to parse.
+ * @param defaultUnit specify the default unit.
+ * @return The parsed MemorySize.
+ * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
+ */
+ public static MemorySize parse(String text, MemoryUnit defaultUnit)
+ throws IllegalArgumentException {
+ if (!hasUnit(text)) {
+ return parse(text + defaultUnit.getUnits()[0]);
+ }
+
+ return parse(text);
+ }
+
+ /**
+ * Parses the given string as bytes. The supported expressions are listed under {@link
+ * MemorySize}.
+ *
+ * @param text The string to parse
+ * @return The parsed size, in bytes.
+ * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
+ */
+ public static long parseBytes(String text) throws IllegalArgumentException {
+ checkNotNull(text);
+
+ final String trimmed = text.trim();
+ checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");
+
+ final int len = trimmed.length();
+ int pos = 0;
+
+ char current;
+ while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
+ pos++;
+ }
+
+ final String number = trimmed.substring(0, pos);
+ final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+ if (number.isEmpty()) {
+ throw new NumberFormatException("text does not start with a number");
+ }
+
+ final long value;
+ try {
+ value = Long.parseLong(number); // this throws a NumberFormatException on overflow
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "The value '"
+ + number
+ + "' cannot be re represented as 64bit number (numeric overflow).");
+ }
+
+ final long multiplier = parseUnit(unit).map(MemoryUnit::getMultiplier).orElse(1L);
+ final long result = value * multiplier;
+
+ // check for overflow
+ if (result / multiplier != value) {
+ throw new IllegalArgumentException(
+ "The value '"
+ + text
+ + "' cannot be re represented as 64bit number of bytes (numeric overflow).");
+ }
+
+ return result;
+ }
+
+ private static Optional parseUnit(String unit) {
+ if (matchesAny(unit, BYTES)) {
+ return Optional.of(BYTES);
+ } else if (matchesAny(unit, KILO_BYTES)) {
+ return Optional.of(KILO_BYTES);
+ } else if (matchesAny(unit, MEGA_BYTES)) {
+ return Optional.of(MEGA_BYTES);
+ } else if (matchesAny(unit, GIGA_BYTES)) {
+ return Optional.of(GIGA_BYTES);
+ } else if (matchesAny(unit, TERA_BYTES)) {
+ return Optional.of(TERA_BYTES);
+ } else if (!unit.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Memory size unit '"
+ + unit
+ + "' does not match any of the recognized units: "
+ + MemoryUnit.getAllUnits());
+ }
+
+ return Optional.empty();
+ }
+
+ private static boolean matchesAny(String str, MemoryUnit unit) {
+ for (String s : unit.getUnits()) {
+ if (s.equals(str)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Enum which defines memory unit, mostly used to parse value from configuration file.
+ *
+ * To make larger values more compact, the common size suffixes are supported:
+ *
+ *
+ * - 1b or 1bytes (bytes)
+ *
- 1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
+ *
- 1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)
+ *
- 1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)
+ *
- 1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)
+ *
+ */
+ public enum MemoryUnit {
+ BYTES(new String[] {"b", "bytes"}, 1L),
+ KILO_BYTES(new String[] {"k", "kb", "kibibytes"}, 1024L),
+ MEGA_BYTES(new String[] {"m", "mb", "mebibytes"}, 1024L * 1024L),
+ GIGA_BYTES(new String[] {"g", "gb", "gibibytes"}, 1024L * 1024L * 1024L),
+ TERA_BYTES(new String[] {"t", "tb", "tebibytes"}, 1024L * 1024L * 1024L * 1024L);
+
+ private final String[] units;
+
+ private final long multiplier;
+
+ MemoryUnit(String[] units, long multiplier) {
+ this.units = units;
+ this.multiplier = multiplier;
+ }
+
+ public String[] getUnits() {
+ return units;
+ }
+
+ public long getMultiplier() {
+ return multiplier;
+ }
+
+ public static String getAllUnits() {
+ return concatenateUnits(
+ BYTES.getUnits(),
+ KILO_BYTES.getUnits(),
+ MEGA_BYTES.getUnits(),
+ GIGA_BYTES.getUnits(),
+ TERA_BYTES.getUnits());
+ }
+
+ public static boolean hasUnit(String text) {
+ checkNotNull(text);
+
+ final String trimmed = text.trim();
+ checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");
+
+ final int len = trimmed.length();
+ int pos = 0;
+
+ char current;
+ while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
+ pos++;
+ }
+
+ final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+ return unit.length() > 0;
+ }
+
+ private static String concatenateUnits(final String[]... allUnits) {
+ final StringBuilder builder = new StringBuilder(128);
+
+ for (String[] units : allUnits) {
+ builder.append('(');
+
+ for (String unit : units) {
+ builder.append(unit);
+ builder.append(" | ");
+ }
+
+ builder.setLength(builder.length() - 3);
+ builder.append(") / ");
+ }
+
+ builder.setLength(builder.length() - 3);
+ return builder.toString();
+ }
+ }
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/config/StructuredOptionsSplitter.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/config/StructuredOptionsSplitter.java
new file mode 100644
index 00000000..91d19397
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/config/StructuredOptionsSplitter.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.config;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static com.alibaba.flink.shuffle.common.utils.CommonUtils.checkNotNull;
+
+/**
+ * Helper class for splitting a string on a given delimiter with quoting logic.
+ *
+ * This class is copied from Apache Flink
+ * (org.apache.flink.configuration.StructuredOptionsSplitter).
+ */
+class StructuredOptionsSplitter {
+
+ /**
+ * Splits the given string on the given delimiter. It supports quoting parts of the string with
+ * either single (') or double quotes ("). Quotes can be escaped by doubling the quotes.
+ *
+ *
Examples:
+ *
+ *
+ * - 'A;B';C => [A;B], [C]
+ *
- "AB'D";B;C => [AB'D], [B], [C]
+ *
- "AB'""D;B";C => [AB'\"D;B], [C]
+ *
+ *
+ * For more examples check the tests.
+ *
+ * @param string a string to split
+ * @param delimiter delimiter to split on
+ * @return a list of splits
+ */
+ static List splitEscaped(String string, char delimiter) {
+ List tokens = tokenize(checkNotNull(string), delimiter);
+ return processTokens(tokens);
+ }
+
+ /**
+ * Escapes the given string with single quotes, if the input string contains a double quote or
+ * any of the given {@code charsToEscape}. Any single quotes in the input string will be escaped
+ * by doubling.
+ *
+ * Given that the escapeChar is (;)
+ *
+ *
Examples:
+ *
+ *
+ * - A,B,C,D => A,B,C,D
+ *
- A'B'C'D => 'A''B''C''D'
+ *
- A;BCD => 'A;BCD'
+ *
- AB"C"D => 'AB"C"D'
+ *
- AB'"D:B => 'AB''"D:B'
+ *
+ *
+ * @param string a string which needs to be escaped
+ * @param charsToEscape escape chars for the escape conditions
+ * @return escaped string by single quote
+ */
+ static String escapeWithSingleQuote(String string, String... charsToEscape) {
+ boolean escape =
+ Arrays.stream(charsToEscape).anyMatch(string::contains)
+ || string.contains("\"")
+ || string.contains("'");
+
+ if (escape) {
+ return "'" + string.replaceAll("'", "''") + "'";
+ }
+
+ return string;
+ }
+
+ private static List processTokens(List tokens) {
+ final List splits = new ArrayList<>();
+ for (int i = 0; i < tokens.size(); i++) {
+ Token token = tokens.get(i);
+ switch (token.getTokenType()) {
+ case DOUBLE_QUOTED:
+ case SINGLE_QUOTED:
+ if (i + 1 < tokens.size()
+ && tokens.get(i + 1).getTokenType() != TokenType.DELIMITER) {
+ int illegalPosition = tokens.get(i + 1).getPosition() - 1;
+ throw new IllegalArgumentException(
+ "Could not split string. Illegal quoting at position: "
+ + illegalPosition);
+ }
+ splits.add(token.getString());
+ break;
+ case UNQUOTED:
+ splits.add(token.getString());
+ break;
+ case DELIMITER:
+ if (i + 1 < tokens.size()
+ && tokens.get(i + 1).getTokenType() == TokenType.DELIMITER) {
+ splits.add("");
+ }
+ break;
+ }
+ }
+
+ return splits;
+ }
+
+ private static List tokenize(String string, char delimiter) {
+ final List tokens = new ArrayList<>();
+ final StringBuilder builder = new StringBuilder();
+ for (int cursor = 0; cursor < string.length(); ) {
+ final char c = string.charAt(cursor);
+
+ int nextChar = cursor + 1;
+ if (c == '\'') {
+ nextChar = consumeInQuotes(string, '\'', cursor, builder);
+ tokens.add(new Token(TokenType.SINGLE_QUOTED, builder.toString(), cursor));
+ } else if (c == '"') {
+ nextChar = consumeInQuotes(string, '"', cursor, builder);
+ tokens.add(new Token(TokenType.DOUBLE_QUOTED, builder.toString(), cursor));
+ } else if (c == delimiter) {
+ tokens.add(new Token(TokenType.DELIMITER, String.valueOf(c), cursor));
+ } else if (!Character.isWhitespace(c)) {
+ nextChar = consumeUnquoted(string, delimiter, cursor, builder);
+ tokens.add(new Token(TokenType.UNQUOTED, builder.toString().trim(), cursor));
+ }
+ builder.setLength(0);
+ cursor = nextChar;
+ }
+
+ return tokens;
+ }
+
+ private static int consumeInQuotes(
+ String string, char quote, int cursor, StringBuilder builder) {
+ for (int i = cursor + 1; i < string.length(); i++) {
+ char c = string.charAt(i);
+ if (c == quote) {
+ if (i + 1 < string.length() && string.charAt(i + 1) == quote) {
+ builder.append(c);
+ i += 1;
+ } else {
+ return i + 1;
+ }
+ } else {
+ builder.append(c);
+ }
+ }
+
+ throw new IllegalArgumentException(
+ "Could not split string. Quoting was not closed properly.");
+ }
+
+ private static int consumeUnquoted(
+ String string, char delimiter, int cursor, StringBuilder builder) {
+ int i;
+ for (i = cursor; i < string.length(); i++) {
+ char c = string.charAt(i);
+ if (c == delimiter) {
+ return i;
+ }
+
+ builder.append(c);
+ }
+
+ return i;
+ }
+
+ private enum TokenType {
+ DOUBLE_QUOTED,
+ SINGLE_QUOTED,
+ UNQUOTED,
+ DELIMITER
+ }
+
+ private static class Token {
+ private final TokenType tokenType;
+ private final String string;
+ private final int position;
+
+ private Token(TokenType tokenType, String string, int position) {
+ this.tokenType = tokenType;
+ this.string = string;
+ this.position = position;
+ }
+
+ public TokenType getTokenType() {
+ return tokenType;
+ }
+
+ public String getString() {
+ return string;
+ }
+
+ public int getPosition() {
+ return position;
+ }
+ }
+
+ private StructuredOptionsSplitter() {}
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/exception/ConfigurationException.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/exception/ConfigurationException.java
new file mode 100644
index 00000000..880c395e
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/exception/ConfigurationException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.exception;
+
+/** Exception to be thrown when any configuration error occurs. */
+public class ConfigurationException extends ShuffleException {
+
+ private static final long serialVersionUID = 5012483304677960591L;
+
+ public ConfigurationException(String message) {
+ super(message);
+ }
+
+ public ConfigurationException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/exception/ShuffleException.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/exception/ShuffleException.java
new file mode 100644
index 00000000..39fa4b0a
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/exception/ShuffleException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.exception;
+
+/** Basic checked exception type of the flink remote shuffle service. */
+public class ShuffleException extends RuntimeException {
+
+ private static final long serialVersionUID = 4354119805642345969L;
+
+ public ShuffleException(Throwable cause) {
+ super(cause);
+ }
+
+ public ShuffleException(String message) {
+ super(message);
+ }
+
+ public ShuffleException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/functions/AutoCloseableAsync.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/functions/AutoCloseableAsync.java
new file mode 100644
index 00000000..557b1dd4
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/functions/AutoCloseableAsync.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.functions;
+
+import com.alibaba.flink.shuffle.common.exception.ShuffleException;
+import com.alibaba.flink.shuffle.common.utils.ExceptionUtils;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/** Closeable interface which allows to close a resource in a non blocking fashion. */
+public interface AutoCloseableAsync extends AutoCloseable {
+
+ /**
+ * Trigger the closing of the resource and return the corresponding close future.
+ *
+ * @return Future which is completed once the resource has been closed
+ */
+ CompletableFuture closeAsync();
+
+ @Override
+ default void close() throws Exception {
+ try {
+ closeAsync().get();
+ } catch (ExecutionException exception) {
+ throw new ShuffleException(
+ "Could not close resource.",
+ ExceptionUtils.stripException(exception, ExecutionException.class));
+ }
+ }
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/functions/BiConsumerWithException.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/functions/BiConsumerWithException.java
new file mode 100644
index 00000000..37e4c0ca
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/functions/BiConsumerWithException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.functions;
+
+/** Enhancement version of {@link java.util.function.BiConsumer} which can throw exceptions. */
+public interface BiConsumerWithException {
+ void accept(T var1, U var2) throws E;
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/functions/ConsumerWithException.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/functions/ConsumerWithException.java
new file mode 100644
index 00000000..3f636832
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/functions/ConsumerWithException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.functions;
+
+/** Enhancement version of {@link java.util.function.Consumer} which can throw exceptions. */
+public interface ConsumerWithException {
+ void accept(T var) throws E;
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/functions/RunnableWithException.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/functions/RunnableWithException.java
new file mode 100644
index 00000000..740d2e7d
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/functions/RunnableWithException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.functions;
+
+/** Enhancement version of {@link Runnable} which can throw exceptions. */
+public interface RunnableWithException {
+
+ void run() throws Exception;
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/functions/SupplierWithException.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/functions/SupplierWithException.java
new file mode 100644
index 00000000..98fc62a8
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/functions/SupplierWithException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.functions;
+
+/**
+ * A functional interface for a {@link java.util.function.Supplier} that may throw exceptions.
+ *
+ * @param The type of the result of the supplier.
+ * @param The type of Exceptions thrown by this function.
+ */
+@FunctionalInterface
+public interface SupplierWithException {
+
+ /**
+ * Gets the result of this supplier.
+ *
+ * @return The result of thus supplier.
+ * @throws E This function may throw an exception.
+ */
+ R get() throws E;
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/handler/FatalErrorHandler.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/handler/FatalErrorHandler.java
new file mode 100644
index 00000000..2dcf41b8
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/handler/FatalErrorHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.handler;
+
+/** Handler interface for fatal error. */
+public interface FatalErrorHandler {
+
+ void onFatalError(Throwable throwable);
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/CommonUtils.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/CommonUtils.java
new file mode 100644
index 00000000..7ea01c21
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/CommonUtils.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import com.alibaba.flink.shuffle.common.functions.RunnableWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Supplier;
+
+/** Utility methods can be used by all modules. */
+public class CommonUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CommonUtils.class);
+
+ private static final char[] HEX_CHARS = {
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
+ };
+
+ private static final int DEFAULT_RETRY_TIMES = 3;
+
+ private static final ByteOrder DEFAULT_BYTE_ORDER = ByteOrder.BIG_ENDIAN;
+
+ /**
+ * Ensures that the target object is not null and returns it. It will throw {@link
+ * NullPointerException} if the target object is null.
+ */
+ public static T checkNotNull(T object) {
+ if (object == null) {
+ throw new NullPointerException("Must be not null.");
+ }
+ return object;
+ }
+
+ /**
+ * Check the legality of method arguments. It will throw {@link IllegalArgumentException} if the
+ * given condition is not true.
+ */
+ public static void checkArgument(boolean condition, @Nullable String message) {
+ if (!condition) {
+ throw new IllegalArgumentException(message);
+ }
+ }
+
+ /**
+ * Check the legality of method arguments. It will throw {@link IllegalArgumentException} if the
+ * given condition is not true.
+ */
+ public static void checkArgument(boolean condition) {
+ if (!condition) {
+ throw new IllegalArgumentException("Illegal argument.");
+ }
+ }
+
+ /**
+ * Checks the legality of program state. It will throw {@link IllegalStateException} if the
+ * given condition is not true.
+ */
+ public static void checkState(boolean condition, @Nullable String message) {
+ if (!condition) {
+ throw new IllegalStateException(message);
+ }
+ }
+
+ /**
+ * Checks the legality of program state. It will throw {@link IllegalStateException} if the
+ * given condition is not true.
+ */
+ public static void checkState(boolean condition, Supplier message) {
+ if (!condition) {
+ throw new IllegalStateException(message.get());
+ }
+ }
+
+ /**
+ * Checks the legality of program state. It will throw {@link IllegalStateException} if the
+ * given condition is not true.
+ */
+ public static void checkState(boolean condition) {
+ if (!condition) {
+ throw new IllegalStateException("Illegal state.");
+ }
+ }
+
+ /** Exists the current process and logs the error when any unrecoverable exception occurs. */
+ public static void exitOnFatalError(Throwable throwable) {
+ LOG.error("Exiting on fatal error.", throwable);
+ FatalErrorExitUtils.exitProcessIfNeeded(-101);
+ }
+
+ /** Generates a random byte array of the given length. */
+ public static byte[] randomBytes(int length) {
+ checkArgument(length > 0, "Must be positive.");
+
+ Random random = new Random();
+ byte[] bytes = new byte[length];
+ random.nextBytes(bytes);
+ return bytes;
+ }
+
+ /** Converts the given byte array to a printable hex string. */
+ public static String bytesToHexString(byte[] bytes) {
+ checkArgument(bytes != null, "Must be not null.");
+
+ char[] chars = new char[bytes.length * 2];
+
+ for (int i = 0; i < chars.length; i += 2) {
+ int index = i >>> 1;
+ chars[i] = HEX_CHARS[(0xF0 & bytes[index]) >>> 4];
+ chars[i + 1] = HEX_CHARS[0x0F & bytes[index]];
+ }
+
+ return new String(chars);
+ }
+
+ public static byte[] hexStringToBytes(String hexString) {
+ byte[] bytes = new byte[hexString.length() / 2];
+ for (int i = 0; i < hexString.length(); i += 2) {
+ byte high = Byte.parseByte(hexString.charAt(i) + "", 16);
+ byte low = Byte.parseByte(hexString.charAt(i + 1) + "", 16);
+ bytes[i / 2] = (byte) ((high << 4) | low);
+ }
+
+ return bytes;
+ }
+
+ /** Generates a random hex string of the given length. */
+ public static String randomHexString(int length) {
+ checkArgument(length > 0, "Must be positive.");
+
+ char[] chars = new char[length];
+ Random random = new Random();
+
+ for (int i = 0; i < length; ++i) {
+ chars[i] = HEX_CHARS[random.nextInt(HEX_CHARS.length)];
+ }
+
+ return new String(chars);
+ }
+
+ public static byte[] concatByteArrays(byte[]... byteArrays) {
+ int totalLength =
+ Arrays.stream(byteArrays).map(array -> 4 + array.length).reduce(0, Integer::sum);
+ ByteBuffer buffer = allocateHeapByteBuffer(totalLength);
+ for (byte[] array : byteArrays) {
+ buffer.putInt(array.length);
+ buffer.put(array);
+ }
+
+ return buffer.array();
+ }
+
+ public static List splitByteArrays(byte[] concatArray) {
+ List arrays = new ArrayList<>();
+ ByteBuffer buffer = ByteBuffer.wrap(concatArray);
+ while (buffer.hasRemaining()) {
+ int length = buffer.getInt();
+ byte[] array = new byte[length];
+ buffer.get(array);
+ arrays.add(array);
+ }
+
+ return arrays;
+ }
+
+ public static byte[] intToBytes(int i) {
+ ByteBuffer bb = allocateHeapByteBuffer(4);
+ bb.putInt(i);
+ return bb.array();
+ }
+
+ public static byte[] longToBytes(long i) {
+ ByteBuffer bb = allocateHeapByteBuffer(8);
+ bb.putLong(i);
+ return bb.array();
+ }
+
+ public static boolean isValidHostPort(int port) {
+ return 0 <= port && port <= 65535;
+ }
+
+ /**
+ * Runs the given {@link RunnableWithException} in current thread silently and do nothing even
+ * when any exception occurs.
+ */
+ public static void runQuietly(@Nullable RunnableWithException runnable) {
+ runQuietly(runnable, false);
+ }
+
+ /**
+ * Runs the given {@link RunnableWithException} in current thread and may log the encountered
+ * exception if any.
+ */
+ public static void runQuietly(@Nullable RunnableWithException runnable, boolean logFailure) {
+ if (runnable == null) {
+ return;
+ }
+
+ try {
+ runnable.run();
+ } catch (Throwable throwable) {
+ if (logFailure) {
+ LOG.warn("Failed to run task.", throwable);
+ }
+ }
+ }
+
+ /**
+ * Closes the target {@link AutoCloseable} and retries a maximum of {@link #DEFAULT_RETRY_TIMES}
+ * times. It will throw exception if still fails after that.
+ */
+ public static void closeWithRetry(@Nullable AutoCloseable closeable) throws Exception {
+ closeWithRetry(closeable, DEFAULT_RETRY_TIMES);
+ }
+
+ /**
+ * Closes the target {@link AutoCloseable} and retries a maximum of the given times. It will
+ * throw exception if still fails after that.
+ */
+ public static void closeWithRetry(@Nullable AutoCloseable closeable, int retryTimes)
+ throws Exception {
+ Throwable exception = null;
+ for (int i = 0; i < retryTimes; ++i) {
+ try {
+ if (closeable != null) {
+ closeable.close();
+ }
+ return;
+ } catch (Throwable throwable) {
+ exception = exception != null ? exception : throwable;
+ }
+ }
+
+ if (exception != null) {
+ ExceptionUtils.rethrowException(exception);
+ }
+ }
+
+ /**
+ * Deletes the target file and retries a maximum of {@link #DEFAULT_RETRY_TIMES} times. It will
+ * throw exception if still fails after that.
+ */
+ public static void deleteFileWithRetry(@Nullable Path path) throws Exception {
+ deleteFileWithRetry(path, DEFAULT_RETRY_TIMES);
+ }
+
+ /**
+ * Deletes the target file and retries a maximum of the given times. It will throw exception if
+ * still fails after that.
+ */
+ public static void deleteFileWithRetry(@Nullable Path path, int retryTimes) throws Exception {
+ Throwable exception = null;
+ for (int i = 0; i < retryTimes; ++i) {
+ try {
+ if (path != null) {
+ Files.deleteIfExists(path);
+ }
+ return;
+ } catch (Throwable throwable) {
+ exception = exception != null ? exception : throwable;
+ }
+ }
+
+ if (exception != null) {
+ ExceptionUtils.rethrowException(exception);
+ }
+ }
+
+ /** Allocates a piece of unmanaged heap {@link ByteBuffer} of the given size. */
+ public static ByteBuffer allocateHeapByteBuffer(int size) {
+ checkArgument(size >= 0, "Must be non-negative.");
+
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ buffer.order(DEFAULT_BYTE_ORDER);
+ return buffer;
+ }
+
+ /** Allocates a piece of unmanaged direct {@link ByteBuffer} of the given size. */
+ public static ByteBuffer allocateDirectByteBuffer(int size) {
+ checkArgument(size >= 0, "Must be non-negative.");
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(size);
+ buffer.order(DEFAULT_BYTE_ORDER);
+ return buffer;
+ }
+
+ /** Casts the given long value to int and ensures there is no loss. */
+ public static int checkedDownCast(long value) {
+ int downCast = (int) value;
+ if ((long) downCast != value) {
+ throw new IllegalArgumentException(
+ "Cannot downcast long value " + value + " to integer.");
+ }
+
+ return downCast;
+ }
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/ExceptionUtils.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/ExceptionUtils.java
new file mode 100644
index 00000000..6871299f
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/ExceptionUtils.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import javax.annotation.Nullable;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Optional;
+
+/** Utility for manipulating exceptions. */
+public class ExceptionUtils {
+
+ /**
+ * Checks whether the given exception indicates a situation that may leave the JVM in a
+ * corrupted state, meaning a state where continued normal operation can only be guaranteed via
+ * clean process restart.
+ */
+ public static boolean isJvmFatalError(Throwable t) {
+ return (t instanceof InternalError)
+ || (t instanceof UnknownError)
+ || (t instanceof ThreadDeath);
+ }
+
+ /**
+ * Checks whether the given exception indicates a situation that may leave the JVM in a
+ * corrupted state, or an out-of-memory error.
+ */
+ public static boolean isJvmFatalOrOutOfMemoryError(Throwable t) {
+ return isJvmFatalError(t) || t instanceof OutOfMemoryError;
+ }
+
+ /**
+ * Checks whether the given exception indicates a JVM metaspace out-of-memory error.
+ *
+ * @param t The exception to check.
+ * @return True, if the exception is the metaspace {@link OutOfMemoryError}, false otherwise.
+ */
+ public static boolean isMetaspaceOutOfMemoryError(@Nullable Throwable t) {
+ return isOutOfMemoryErrorWithMessageStartingWith(t, "Metaspace");
+ }
+
+ /** Checks whether the given exception indicates a JVM direct out-of-memory error. */
+ public static boolean isDirectOutOfMemoryError(@Nullable Throwable t) {
+ return isOutOfMemoryErrorWithMessageStartingWith(t, "Direct buffer memory");
+ }
+
+ public static boolean isHeapSpaceOutOfMemoryError(@Nullable Throwable t) {
+ return isOutOfMemoryErrorWithMessageStartingWith(t, "Java heap space");
+ }
+
+ private static boolean isOutOfMemoryErrorWithMessageStartingWith(
+ @Nullable Throwable t, String prefix) {
+ // the exact matching of the class is checked to avoid matching any custom subclasses of
+ // OutOfMemoryError as we are interested in the original exceptions, generated by JVM.
+ return isOutOfMemoryError(t) && t.getMessage() != null && t.getMessage().startsWith(prefix);
+ }
+
+ private static boolean isOutOfMemoryError(@Nullable Throwable t) {
+ return t != null && t.getClass() == OutOfMemoryError.class;
+ }
+
+ /** Rethrows the target {@link Throwable} as {@link Error} or {@link RuntimeException}. */
+ public static void rethrowAsRuntimeException(Throwable t) {
+ if (t instanceof Error) {
+ throw (Error) t;
+ } else if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ } else {
+ throw new RuntimeException(t);
+ }
+ }
+
+ /** Rethrows the target {@link Throwable} as {@link Error} or {@link Exception}. */
+ public static void rethrowException(Throwable throwable) throws Exception {
+ if (throwable instanceof Error) {
+ throw (Error) throwable;
+ } else {
+ throw (Exception) throwable;
+ }
+ }
+
+ /**
+ * Unpacks an specified exception and returns its cause. Otherwise the given {@link Throwable}
+ * is returned.
+ */
+ public static Throwable stripException(
+ Throwable throwableToStrip, Class extends Throwable> typeToStrip) {
+ while (typeToStrip.isAssignableFrom(throwableToStrip.getClass())
+ && throwableToStrip.getCause() != null) {
+ throwableToStrip = throwableToStrip.getCause();
+ }
+ return throwableToStrip;
+ }
+
+ /** Checks whether a throwable chain contains a specific type of exception and returns it. */
+ public static Optional findThrowable(
+ Throwable throwable, Class searchType) {
+ if (throwable == null || searchType == null) {
+ return Optional.empty();
+ }
+
+ Throwable cause = throwable;
+ while (cause != null) {
+ if (searchType.isAssignableFrom(cause.getClass())) {
+ return Optional.of(searchType.cast(cause));
+ } else {
+ cause = cause.getCause();
+ }
+ }
+ return Optional.empty();
+ }
+
+ public static String summaryErrorMessageStack(Throwable t) {
+ StringBuilder sb = new StringBuilder();
+ do {
+ if (sb.length() != 0) {
+ sb.append(" -> ");
+ }
+ sb.append("[")
+ .append(t.getClass().getName())
+ .append(": ")
+ .append(t.getMessage())
+ .append("]");
+ } while ((t = t.getCause()) != null);
+ return sb.toString();
+ }
+
+ /**
+ * Makes a string representation of the exception's stack trace, or "(null)", if the exception
+ * is null.
+ */
+ public static String stringifyException(Throwable exception) {
+ if (exception == null) {
+ return "(null)";
+ }
+
+ try {
+ StringWriter stm = new StringWriter();
+ PrintWriter wrt = new PrintWriter(stm);
+ exception.printStackTrace(wrt);
+ wrt.close();
+ return stm.toString();
+ } catch (Throwable throwable) {
+ return exception.getClass().getName() + " (error while printing stack trace)";
+ }
+ }
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/FatalErrorExitUtils.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/FatalErrorExitUtils.java
new file mode 100644
index 00000000..aa7ea934
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/FatalErrorExitUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * System.exit() need to be called when a fatal error is encountered. However, in some cases, such
+ * as deployment on yarn, System.exit() can't be called because it may affect other processes, e.g.,
+ * Yarn Node Manager process.
+ *
+ * In order to avoid affecting other processes, this class will use different strategies to deal
+ * with these fatal errors according to different deployment environments.
+ */
+public class FatalErrorExitUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(FatalErrorExitUtils.class);
+
+ private static volatile boolean needStopProcess = true;
+
+ public static void setNeedStopProcess(boolean needStopProcess) {
+ FatalErrorExitUtils.needStopProcess = needStopProcess;
+ }
+
+ /** Only for tests. */
+ public static boolean isNeedStopProcess() {
+ return needStopProcess;
+ }
+
+ public static void exitProcessIfNeeded(int exitCode) {
+ exitProcessIfNeeded(exitCode, null);
+ }
+
+ public static void exitProcessIfNeeded(int exitCode, Throwable t) {
+ StringBuilder sb =
+ new StringBuilder("Stopping the process with code ").append(exitCode).append(". ");
+ sb.append("Whether the process should be exit? ").append(needStopProcess).append(". ");
+ if (!needStopProcess) {
+ sb.append("Ignore the stop operation and return directly.");
+ }
+ if (t != null) {
+ LOG.error(sb.toString(), t);
+ } else {
+ LOG.error(sb.toString());
+ }
+
+ if (needStopProcess) {
+ System.exit(exitCode);
+ }
+ }
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/FutureUtils.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/FutureUtils.java
new file mode 100644
index 00000000..0e2b5176
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/FutureUtils.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+
+/**
+ * A collection of utilities that expand the usage of {@link CompletableFuture}.
+ *
+ *
This class is partly copied from Apache Flink
+ * (org.apache.flink.runtime.concurrent.FutureUtils).
+ */
+public class FutureUtils {
+
+ /**
+ * Run the given asynchronous action after the completion of the given future. The given future
+ * can be completed normally or exceptionally. In case of an exceptional completion, the
+ * asynchronous action's exception will be added to the initial exception.
+ *
+ * @param future to wait for its completion
+ * @param composedAction asynchronous action which is triggered after the future's completion
+ * @return Future which is completed after the asynchronous action has completed. This future
+ * can contain an exception if an error occurred in the given future or asynchronous action.
+ */
+ public static CompletableFuture composeAfterwards(
+ CompletableFuture> future, Supplier> composedAction) {
+ final CompletableFuture resultFuture = new CompletableFuture<>();
+
+ future.whenComplete(
+ (Object outerIgnored, Throwable outerThrowable) -> {
+ final CompletableFuture> composedActionFuture = composedAction.get();
+
+ composedActionFuture.whenComplete(
+ (Object innerIgnored, Throwable innerThrowable) -> {
+ if (innerThrowable != null) {
+ resultFuture.completeExceptionally(
+ outerThrowable == null
+ ? innerThrowable
+ : outerThrowable);
+ } else if (outerThrowable != null) {
+ resultFuture.completeExceptionally(outerThrowable);
+ } else {
+ resultFuture.complete(null);
+ }
+ });
+ });
+
+ return resultFuture;
+ }
+
+ /**
+ * Creates a {@link ConjunctFuture} which is only completed after all given futures have
+ * completed. Unlike {@link FutureUtils#waitForAll(Collection)}, the resulting future won't be
+ * completed directly if one of the given futures is completed exceptionally. Instead, all
+ * occurring exception will be collected and combined to a single exception. If at least on
+ * exception occurs, then the resulting future will be completed exceptionally.
+ *
+ * @param futuresToComplete futures to complete
+ * @return Future which is completed after all given futures have been completed.
+ */
+ public static ConjunctFuture completeAll(
+ Collection extends CompletableFuture>> futuresToComplete) {
+ return new CompletionConjunctFuture(futuresToComplete);
+ }
+
+ /**
+ * Returns an exceptionally completed {@link CompletableFuture}.
+ *
+ * @param cause to complete the future with
+ * @param type of the future
+ * @return An exceptionally completed CompletableFuture
+ */
+ public static CompletableFuture completedExceptionally(Throwable cause) {
+ CompletableFuture result = new CompletableFuture<>();
+ result.completeExceptionally(cause);
+ return result;
+ }
+
+ /**
+ * Creates a future that is complete once multiple other futures completed. The future fails
+ * (completes exceptionally) once one of the futures in the conjunction fails. Upon successful
+ * completion, the future returns the collection of the futures' results.
+ *
+ * The ConjunctFuture gives access to how many Futures in the conjunction have already
+ * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}.
+ *
+ * @param futures The futures that make up the conjunction. No null entries are allowed.
+ * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
+ */
+ public static ConjunctFuture> combineAll(
+ Collection extends CompletableFuture extends T>> futures) {
+ CommonUtils.checkArgument(futures != null, "Must be not null.");
+ return new ResultConjunctFuture<>(futures);
+ }
+
+ /**
+ * Creates a future that is complete once all of the given futures have completed. The future
+ * fails (completes exceptionally) once one of the given futures fails.
+ *
+ * The ConjunctFuture gives access to how many Futures have already completed successfully,
+ * via {@link ConjunctFuture#getNumFuturesCompleted()}.
+ *
+ * @param futures The futures to wait on. No null entries are allowed.
+ * @return The WaitingFuture that completes once all given futures are complete (or one fails).
+ */
+ public static ConjunctFuture waitForAll(
+ Collection extends CompletableFuture>> futures) {
+ CommonUtils.checkArgument(futures != null, "Must be not null.");
+ return new WaitingConjunctFuture(futures);
+ }
+
+ /**
+ * A future that is complete once multiple other futures completed. The futures are not
+ * necessarily of the same type. The ConjunctFuture fails (completes exceptionally) once one of
+ * the Futures in the conjunction fails.
+ *
+ * The advantage of using the ConjunctFuture over chaining all the futures (such as via
+ * {@link CompletableFuture#thenCombine(CompletionStage, BiFunction)} )}) is that ConjunctFuture
+ * also tracks how many of the Futures are already complete.
+ */
+ public abstract static class ConjunctFuture extends CompletableFuture {
+
+ /**
+ * Gets the total number of Futures in the conjunction.
+ *
+ * @return The total number of Futures in the conjunction.
+ */
+ public abstract int getNumFuturesTotal();
+
+ /**
+ * Gets the number of Futures in the conjunction that are already complete.
+ *
+ * @return The number of Futures in the conjunction that are already complete
+ */
+ public abstract int getNumFuturesCompleted();
+ }
+
+ /**
+ * Implementation of the {@link ConjunctFuture} interface which waits only for the completion of
+ * its futures and does not return their values.
+ */
+ private static final class WaitingConjunctFuture extends ConjunctFuture {
+
+ /** Number of completed futures. */
+ private final AtomicInteger numCompleted = new AtomicInteger(0);
+
+ /** Total number of futures to wait on. */
+ private final int numTotal;
+
+ /**
+ * Method which increments the atomic completion counter and completes or fails the
+ * WaitingFutureImpl.
+ */
+ private void handleCompletedFuture(Object ignored, Throwable throwable) {
+ if (throwable == null) {
+ if (numTotal == numCompleted.incrementAndGet()) {
+ complete(null);
+ }
+ } else {
+ completeExceptionally(throwable);
+ }
+ }
+
+ private WaitingConjunctFuture(Collection extends CompletableFuture>> futures) {
+ this.numTotal = futures.size();
+
+ if (futures.isEmpty()) {
+ complete(null);
+ } else {
+ for (java.util.concurrent.CompletableFuture> future : futures) {
+ future.whenComplete(this::handleCompletedFuture);
+ }
+ }
+ }
+
+ @Override
+ public int getNumFuturesTotal() {
+ return numTotal;
+ }
+
+ @Override
+ public int getNumFuturesCompleted() {
+ return numCompleted.get();
+ }
+ }
+
+ /**
+ * The implementation of the {@link ConjunctFuture} which returns its Futures' result as a
+ * collection.
+ */
+ private static class ResultConjunctFuture extends ConjunctFuture> {
+
+ /** The total number of futures in the conjunction. */
+ private final int numTotal;
+
+ /** The number of futures in the conjunction that are already complete. */
+ private final AtomicInteger numCompleted = new AtomicInteger(0);
+
+ /** The set of collected results so far. */
+ private final T[] results;
+
+ /**
+ * The function that is attached to all futures in the conjunction. Once a future is
+ * complete, this function tracks the completion or fails the conjunct.
+ */
+ private void handleCompletedFuture(int index, T value, Throwable throwable) {
+ if (throwable != null) {
+ completeExceptionally(throwable);
+ } else {
+ /**
+ * This {@link #results} update itself is not synchronised in any way and it's fine
+ * because:
+ *
+ *
+ * - There is a happens-before relationship for each thread (that is completing
+ * the future) between setting {@link #results} and incrementing {@link
+ * #numCompleted}.
+ *
- Each thread is updating uniquely different field of the {@link #results}
+ * array.
+ *
- There is a happens-before relationship between all of the writing threads
+ * and the last one thread (thanks to the {@code
+ * numCompleted.incrementAndGet() == numTotal} check.
+ *
- The last thread will be completing the future, so it has transitively
+ * happens-before relationship with all of preceding updated/writes to {@link
+ * #results}.
+ *
- {@link AtomicInteger#incrementAndGet} is an equivalent of both volatile
+ * read & write
+ *
+ */
+ results[index] = value;
+
+ if (numCompleted.incrementAndGet() == numTotal) {
+ complete(Arrays.asList(results));
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ ResultConjunctFuture(Collection extends CompletableFuture extends T>> resultFutures) {
+ this.numTotal = resultFutures.size();
+ results = (T[]) new Object[numTotal];
+
+ if (resultFutures.isEmpty()) {
+ complete(Collections.emptyList());
+ } else {
+ int counter = 0;
+ for (CompletableFuture extends T> future : resultFutures) {
+ final int index = counter;
+ counter++;
+ future.whenComplete(
+ (value, throwable) -> handleCompletedFuture(index, value, throwable));
+ }
+ }
+ }
+
+ @Override
+ public int getNumFuturesTotal() {
+ return numTotal;
+ }
+
+ @Override
+ public int getNumFuturesCompleted() {
+ return numCompleted.get();
+ }
+ }
+
+ /**
+ * {@link ConjunctFuture} implementation which is completed after all the given futures have
+ * been completed. Exceptional completions of the input futures will be recorded but it won't
+ * trigger the early completion of this future.
+ */
+ private static final class CompletionConjunctFuture extends ConjunctFuture {
+
+ private final Object lock = new Object();
+
+ private final int numFuturesTotal;
+
+ private int futuresCompleted;
+
+ private Throwable globalThrowable;
+
+ private CompletionConjunctFuture(
+ Collection extends CompletableFuture>> futuresToComplete) {
+ numFuturesTotal = futuresToComplete.size();
+
+ futuresCompleted = 0;
+
+ globalThrowable = null;
+
+ if (futuresToComplete.isEmpty()) {
+ complete(null);
+ } else {
+ for (CompletableFuture> completableFuture : futuresToComplete) {
+ completableFuture.whenComplete(this::completeFuture);
+ }
+ }
+ }
+
+ private void completeFuture(Object ignored, Throwable throwable) {
+ synchronized (lock) {
+ futuresCompleted++;
+
+ if (throwable != null) {
+ globalThrowable = globalThrowable == null ? throwable : globalThrowable;
+ }
+
+ if (futuresCompleted == numFuturesTotal) {
+ if (globalThrowable != null) {
+ completeExceptionally(globalThrowable);
+ } else {
+ complete(null);
+ }
+ }
+ }
+ }
+
+ @Override
+ public int getNumFuturesTotal() {
+ return numFuturesTotal;
+ }
+
+ @Override
+ public int getNumFuturesCompleted() {
+ synchronized (lock) {
+ return futuresCompleted;
+ }
+ }
+ }
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/Hardware.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/Hardware.java
new file mode 100644
index 00000000..c01aa3d8
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/Hardware.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Convenience class to extract hardware specifics of the computer executing the running JVM.
+ *
+ * This class is copied from Apache Flink (org.apache.flink.runtime.util.Hardware).
+ */
+public class Hardware {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Hardware.class);
+
+ private static final String LINUX_MEMORY_INFO_PATH = "/proc/meminfo";
+
+ private static final Pattern LINUX_MEMORY_REGEX =
+ Pattern.compile("^MemTotal:\\s*(\\d+)\\s+kB$");
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Returns the size of the physical memory in bytes.
+ *
+ * @return the size of the physical memory in bytes or {@code -1}, if the size could not be
+ * determined.
+ */
+ public static long getSizeOfPhysicalMemory() {
+ // first try if the JVM can directly tell us what the system memory is
+ // this works only on Oracle JVMs
+ try {
+ Class> clazz = Class.forName("com.sun.management.OperatingSystemMXBean");
+ Method method = clazz.getMethod("getTotalPhysicalMemorySize");
+ OperatingSystemMXBean operatingSystemMXBean =
+ ManagementFactory.getOperatingSystemMXBean();
+
+ // someone may install different beans, so we need to check whether the bean
+ // is in fact the sun management bean
+ if (clazz.isInstance(operatingSystemMXBean)) {
+ return (Long) method.invoke(operatingSystemMXBean);
+ }
+ } catch (ClassNotFoundException e) {
+ // this happens on non-Oracle JVMs, do nothing and use the alternative code paths
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ LOG.warn(
+ "Access to physical memory size: "
+ + "com.sun.management.OperatingSystemMXBean incompatibly changed.",
+ e);
+ }
+
+ // we now try the OS specific access paths
+ switch (OperatingSystem.getCurrentOperatingSystem()) {
+ case LINUX:
+ return getSizeOfPhysicalMemoryForLinux();
+
+ case WINDOWS:
+ return getSizeOfPhysicalMemoryForWindows();
+
+ case MAC_OS:
+ return getSizeOfPhysicalMemoryForMac();
+
+ case FREE_BSD:
+ return getSizeOfPhysicalMemoryForFreeBSD();
+
+ case UNKNOWN:
+ LOG.error("Cannot determine size of physical memory for unknown operating system");
+ return -1;
+
+ default:
+ LOG.error("Unrecognized OS: " + OperatingSystem.getCurrentOperatingSystem());
+ return -1;
+ }
+ }
+
+ /**
+ * Returns the size of the physical memory in bytes on a Linux-based operating system.
+ *
+ * @return the size of the physical memory in bytes or {@code -1}, if the size could not be
+ * determined
+ */
+ private static long getSizeOfPhysicalMemoryForLinux() {
+ try (BufferedReader lineReader =
+ new BufferedReader(new FileReader(LINUX_MEMORY_INFO_PATH))) {
+ String line;
+ while ((line = lineReader.readLine()) != null) {
+ Matcher matcher = LINUX_MEMORY_REGEX.matcher(line);
+ if (matcher.matches()) {
+ String totalMemory = matcher.group(1);
+ return Long.parseLong(totalMemory) * 1024L; // Convert from kilobyte to byte
+ }
+ }
+ // expected line did not come
+ LOG.error(
+ "Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo'). "
+ + "Unexpected format.");
+ return -1;
+ } catch (NumberFormatException e) {
+ LOG.error(
+ "Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo'). "
+ + "Unexpected format.");
+ return -1;
+ } catch (Throwable t) {
+ LOG.error(
+ "Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo') ",
+ t);
+ return -1;
+ }
+ }
+
+ /**
+ * Returns the size of the physical memory in bytes on a Mac OS-based operating system.
+ *
+ * @return the size of the physical memory in bytes or {@code -1}, if the size could not be
+ * determined
+ */
+ private static long getSizeOfPhysicalMemoryForMac() {
+ BufferedReader bi = null;
+ try {
+ Process proc = Runtime.getRuntime().exec("sysctl hw.memsize");
+
+ bi =
+ new BufferedReader(
+ new InputStreamReader(proc.getInputStream(), StandardCharsets.UTF_8));
+
+ String line;
+ while ((line = bi.readLine()) != null) {
+ if (line.startsWith("hw.memsize")) {
+ long memsize = Long.parseLong(line.split(":")[1].trim());
+ bi.close();
+ proc.destroy();
+ return memsize;
+ }
+ }
+
+ } catch (Throwable t) {
+ LOG.error("Cannot determine physical memory of machine for MacOS host", t);
+ return -1;
+ } finally {
+ if (bi != null) {
+ try {
+ bi.close();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * Returns the size of the physical memory in bytes on FreeBSD.
+ *
+ * @return the size of the physical memory in bytes or {@code -1}, if the size could not be
+ * determined
+ */
+ private static long getSizeOfPhysicalMemoryForFreeBSD() {
+ BufferedReader bi = null;
+ try {
+ Process proc = Runtime.getRuntime().exec("sysctl hw.physmem");
+
+ bi =
+ new BufferedReader(
+ new InputStreamReader(proc.getInputStream(), StandardCharsets.UTF_8));
+
+ String line;
+ while ((line = bi.readLine()) != null) {
+ if (line.startsWith("hw.physmem")) {
+ long memsize = Long.parseLong(line.split(":")[1].trim());
+ bi.close();
+ proc.destroy();
+ return memsize;
+ }
+ }
+
+ LOG.error(
+ "Cannot determine the size of the physical memory for FreeBSD host "
+ + "(using 'sysctl hw.physmem').");
+ return -1;
+ } catch (Throwable t) {
+ LOG.error(
+ "Cannot determine the size of the physical memory for FreeBSD host "
+ + "(using 'sysctl hw.physmem')",
+ t);
+ return -1;
+ } finally {
+ if (bi != null) {
+ try {
+ bi.close();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns the size of the physical memory in bytes on Windows.
+ *
+ * @return the size of the physical memory in bytes or {@code -1}, if the size could not be
+ * determined
+ */
+ private static long getSizeOfPhysicalMemoryForWindows() {
+ BufferedReader bi = null;
+ try {
+ Process proc = Runtime.getRuntime().exec("wmic memorychip get capacity");
+
+ bi =
+ new BufferedReader(
+ new InputStreamReader(proc.getInputStream(), StandardCharsets.UTF_8));
+
+ String line = bi.readLine();
+ if (line == null) {
+ return -1L;
+ }
+
+ if (!line.startsWith("Capacity")) {
+ return -1L;
+ }
+
+ long sizeOfPhyiscalMemory = 0L;
+ while ((line = bi.readLine()) != null) {
+ if (line.isEmpty()) {
+ continue;
+ }
+
+ line = line.replaceAll(" ", "");
+ sizeOfPhyiscalMemory += Long.parseLong(line);
+ }
+ return sizeOfPhyiscalMemory;
+ } catch (Throwable t) {
+ LOG.error(
+ "Cannot determine the size of the physical memory for Windows host "
+ + "(using 'wmic memorychip')",
+ t);
+ return -1L;
+ } finally {
+ if (bi != null) {
+ try {
+ bi.close();
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private Hardware() {}
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/JvmShutdownSafeguard.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/JvmShutdownSafeguard.java
new file mode 100644
index 00000000..cc0f57a8
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/JvmShutdownSafeguard.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import org.slf4j.Logger;
+
+import static com.alibaba.flink.shuffle.common.utils.CommonUtils.checkArgument;
+
+/**
+ * A utility that guards against blocking shutdown hooks that block JVM shutdown.
+ *
+ *
When the JVM shuts down cleanly (SIGTERM or {@link System#exit(int)}) it runs all
+ * installed shutdown hooks. It is possible that any of the shutdown hooks blocks, which causes the
+ * JVM to get stuck and not exit at all.
+ *
+ *
This utility installs a shutdown hook that forcibly terminates the JVM if it is still alive a
+ * certain time after clean shutdown was initiated. Even if some shutdown hooks block, the JVM will
+ * terminate within a certain time.
+ *
+ *
This class is copied from Apache Flink (org.apache.flink.runtime.util.JvmShutdownSafeguard).
+ */
+public class JvmShutdownSafeguard extends Thread {
+
+ /**
+ * Default delay to wait after clean shutdown was stared, before forcibly terminating the JVM.
+ */
+ private static final long DEFAULT_DELAY = 5000L;
+
+ /** The exit code returned by the JVM process if it is killed by the safeguard. */
+ private static final int EXIT_CODE = -17;
+
+ /** The thread that actually does the termination. */
+ private final Thread terminator;
+
+ private JvmShutdownSafeguard(long delayMillis) {
+ setName("JVM Terminator Launcher");
+
+ this.terminator = new Thread(new DelayedTerminator(delayMillis), "Jvm Terminator");
+ this.terminator.setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ // Because this thread is registered as a shutdown hook, we cannot
+ // wait here and then call for termination. That would always delay the JVM shutdown.
+ // Instead, we spawn a non shutdown hook thread from here.
+ // That thread is a daemon, so it does not keep the JVM alive.
+ terminator.start();
+ }
+
+ // ------------------------------------------------------------------------
+ // The actual Shutdown thread
+ // ------------------------------------------------------------------------
+
+ private static class DelayedTerminator implements Runnable {
+
+ private final long delayMillis;
+
+ private DelayedTerminator(long delayMillis) {
+ this.delayMillis = delayMillis;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(delayMillis);
+ } catch (Throwable t) {
+ // catch all, including thread death, etc
+ }
+
+ Runtime.getRuntime().halt(EXIT_CODE);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Installing as a shutdown hook
+ // ------------------------------------------------------------------------
+
+ /**
+ * Installs the safeguard shutdown hook. The maximum time that the JVM is allowed to spend on
+ * shutdown before being killed is five seconds.
+ *
+ * @param logger The logger to log errors to.
+ */
+ public static void installAsShutdownHook(Logger logger) {
+ installAsShutdownHook(logger, DEFAULT_DELAY);
+ }
+
+ /**
+ * Installs the safeguard shutdown hook. The maximum time that the JVM is allowed to spend on
+ * shutdown before being killed is the given number of milliseconds.
+ *
+ * @param logger The logger to log errors to.
+ * @param delayMillis The delay (in milliseconds) to wait after clean shutdown was stared,
+ * before forcibly terminating the JVM.
+ */
+ public static void installAsShutdownHook(Logger logger, long delayMillis) {
+ checkArgument(delayMillis >= 0, "delay must be >= 0");
+
+ // install the blocking shutdown hook
+ Thread shutdownHook = new JvmShutdownSafeguard(delayMillis);
+ ShutdownHookUtil.addShutdownHookThread(
+ shutdownHook, JvmShutdownSafeguard.class.getSimpleName(), logger);
+ }
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/OperatingSystem.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/OperatingSystem.java
new file mode 100644
index 00000000..8028dec1
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/OperatingSystem.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+/**
+ * An enumeration indicating the operating system that the JVM runs on.
+ *
+ *
This class is copied from Apache Flink (org.apache.flink.util.OperatingSystem).
+ */
+public enum OperatingSystem {
+ LINUX,
+ WINDOWS,
+ MAC_OS,
+ FREE_BSD,
+ SOLARIS,
+ UNKNOWN;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the operating system that the JVM runs on from the java system properties. this method
+ * returns UNKNOWN, if the operating system was not successfully determined.
+ *
+ * @return The enum constant for the operating system, or UNKNOWN, if it was not
+ * possible to determine.
+ */
+ public static OperatingSystem getCurrentOperatingSystem() {
+ return os;
+ }
+
+ /**
+ * Checks whether the operating system this JVM runs on is Windows.
+ *
+ * @return true
if the operating system this JVM runs on is Windows, false
+ *
otherwise
+ */
+ public static boolean isWindows() {
+ return getCurrentOperatingSystem() == WINDOWS;
+ }
+
+ /**
+ * Checks whether the operating system this JVM runs on is Linux.
+ *
+ * @return true
if the operating system this JVM runs on is Linux, false
+ *
otherwise
+ */
+ public static boolean isLinux() {
+ return getCurrentOperatingSystem() == LINUX;
+ }
+
+ /**
+ * Checks whether the operating system this JVM runs on is Windows.
+ *
+ * @return true
if the operating system this JVM runs on is Windows, false
+ *
otherwise
+ */
+ public static boolean isMac() {
+ return getCurrentOperatingSystem() == MAC_OS;
+ }
+
+ /**
+ * Checks whether the operating system this JVM runs on is FreeBSD.
+ *
+ * @return true
if the operating system this JVM runs on is FreeBSD, false
+ *
otherwise
+ */
+ public static boolean isFreeBSD() {
+ return getCurrentOperatingSystem() == FREE_BSD;
+ }
+
+ /**
+ * Checks whether the operating system this JVM runs on is Solaris.
+ *
+ * @return true
if the operating system this JVM runs on is Solaris, false
+ *
otherwise
+ */
+ public static boolean isSolaris() {
+ return getCurrentOperatingSystem() == SOLARIS;
+ }
+
+ /** The enum constant for the operating system. */
+ private static final OperatingSystem os = readOSFromSystemProperties();
+
+ /**
+ * Parses the operating system that the JVM runs on from the java system properties. If the
+ * operating system was not successfully determined, this method returns {@code UNKNOWN}.
+ *
+ * @return The enum constant for the operating system, or {@code UNKNOWN}, if it was not
+ * possible to determine.
+ */
+ private static OperatingSystem readOSFromSystemProperties() {
+ String osName = System.getProperty(OS_KEY);
+
+ if (osName.startsWith(LINUX_OS_PREFIX)) {
+ return LINUX;
+ }
+ if (osName.startsWith(WINDOWS_OS_PREFIX)) {
+ return WINDOWS;
+ }
+ if (osName.startsWith(MAC_OS_PREFIX)) {
+ return MAC_OS;
+ }
+ if (osName.startsWith(FREEBSD_OS_PREFIX)) {
+ return FREE_BSD;
+ }
+ String osNameLowerCase = osName.toLowerCase();
+ if (osNameLowerCase.contains(SOLARIS_OS_INFIX_1)
+ || osNameLowerCase.contains(SOLARIS_OS_INFIX_2)) {
+ return SOLARIS;
+ }
+
+ return UNKNOWN;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Constants to extract the OS type from the java environment
+ // --------------------------------------------------------------------------------------------
+
+ /** The key to extract the operating system name from the system properties. */
+ private static final String OS_KEY = "os.name";
+
+ /** The expected prefix for Linux operating systems. */
+ private static final String LINUX_OS_PREFIX = "Linux";
+
+ /** The expected prefix for Windows operating systems. */
+ private static final String WINDOWS_OS_PREFIX = "Windows";
+
+ /** The expected prefix for Mac OS operating systems. */
+ private static final String MAC_OS_PREFIX = "Mac";
+
+ /** The expected prefix for FreeBSD. */
+ private static final String FREEBSD_OS_PREFIX = "FreeBSD";
+
+ /** One expected infix for Solaris. */
+ private static final String SOLARIS_OS_INFIX_1 = "sunos";
+
+ /** One expected infix for Solaris. */
+ private static final String SOLARIS_OS_INFIX_2 = "solaris";
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/ProcessUtils.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/ProcessUtils.java
new file mode 100644
index 00000000..a9279233
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/ProcessUtils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import java.lang.management.ManagementFactory;
+
+/** Utilities related to the properties of the processes. */
+public class ProcessUtils {
+
+ public static int getProcessID() {
+ return Integer.parseInt(ManagementFactory.getRuntimeMXBean().getName().split("@")[0]);
+ }
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/ProtocolUtils.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/ProtocolUtils.java
new file mode 100644
index 00000000..144831b5
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/ProtocolUtils.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import static com.alibaba.flink.shuffle.common.utils.StringUtils.stringToBytes;
+
+/** Utils to handle versions and protocol compatibility. */
+public class ProtocolUtils {
+
+ private static final int CURRENT_VERSION = 0;
+
+ private static final int EMPTY_BUFFER_SIZE = Integer.MAX_VALUE;
+
+ private static final long EMPTY_OFFSET = -2;
+
+ private static final String EMPTY_EXTRA_MESSAGE = "{}";
+
+ private static final String EMPTY_DATA_PARTITION_TYPE_FACTORY = "emptyDataPartitionTypeFactory";
+
+ /** Returns the current protocol version used. */
+ public static int currentProtocolVersion() {
+ return CURRENT_VERSION;
+ }
+
+ /** Returns the number of empty byte buffers. */
+ public static int emptyBufferSize() {
+ return EMPTY_BUFFER_SIZE;
+ }
+
+ /** Returns the empty offset value. */
+ public static long emptyOffset() {
+ return EMPTY_OFFSET;
+ }
+
+ /** Returns the empty string of data partition type factory. */
+ public static String emptyDataPartitionType() {
+ return EMPTY_DATA_PARTITION_TYPE_FACTORY;
+ }
+
+ /** Returns the empty extra message. */
+ public static String emptyExtraMessage() {
+ return EMPTY_EXTRA_MESSAGE;
+ }
+
+ /** Returns the empty extra message bytes. */
+ public static byte[] emptyExtraMessageBytes() {
+ return stringToBytes(EMPTY_EXTRA_MESSAGE);
+ }
+
+ /**
+ * Returns true if the client protocol version is compatible with the server protocol version,
+ * including both control flow and data flow. This method is used at the server side.
+ */
+ public static boolean isClientProtocolCompatible(int clientProtocolVersion) {
+ return compatibleVersion() <= clientProtocolVersion;
+ }
+
+ /**
+ * Returns true if the client protocol version is compatible with the server protocol version,
+ * including both control flow and data flow. This method is used at the client side.
+ */
+ public static boolean isServerProtocolCompatible(
+ int serverProtocolVersion, int serverCompatibleVersion) {
+ return currentProtocolVersion() <= serverProtocolVersion
+ && serverCompatibleVersion <= currentProtocolVersion();
+ }
+
+ /** Returns the minimum supported version compatible with the current version. */
+ public static int compatibleVersion() {
+ return 0;
+ }
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/ShutdownHookUtil.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/ShutdownHookUtil.java
new file mode 100644
index 00000000..5af7466c
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/ShutdownHookUtil.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import org.slf4j.Logger;
+
+import static com.alibaba.flink.shuffle.common.utils.CommonUtils.checkNotNull;
+
+/**
+ * Utils class for dealing with JVM shutdown hooks.
+ *
+ *
This class is copied from Apache Flink (org.apache.flink.util.ShutdownHookUtil).
+ */
+public class ShutdownHookUtil {
+
+ /** Adds a shutdown hook to the JVM and returns the Thread, which has been registered. */
+ public static Thread addShutdownHook(
+ final AutoCloseable service, final String serviceName, final Logger logger) {
+
+ checkNotNull(service);
+ checkNotNull(logger);
+
+ final Thread shutdownHook =
+ new Thread(
+ () -> {
+ try {
+ service.close();
+ } catch (Throwable t) {
+ logger.error(
+ "Error during shutdown of {} via JVM shutdown hook.",
+ serviceName,
+ t);
+ }
+ },
+ serviceName + " shutdown hook");
+
+ return addShutdownHookThread(shutdownHook, serviceName, logger) ? shutdownHook : null;
+ }
+
+ /**
+ * Adds a shutdown hook to the JVM.
+ *
+ * @param shutdownHook Shutdown hook to be registered.
+ * @param serviceName The name of service.
+ * @param logger The logger to log.
+ * @return Whether the hook has been successfully registered.
+ */
+ public static boolean addShutdownHookThread(
+ final Thread shutdownHook, final String serviceName, final Logger logger) {
+
+ checkNotNull(shutdownHook);
+ checkNotNull(logger);
+
+ try {
+ // Add JVM shutdown hook to call shutdown of service
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ return true;
+ } catch (IllegalStateException e) {
+ // JVM is already shutting down. no need to do our work
+ } catch (Throwable t) {
+ logger.error(
+ "Cannot register shutdown hook that cleanly terminates {}.", serviceName, t);
+ }
+ return false;
+ }
+
+ /** Removes a shutdown hook from the JVM. */
+ public static void removeShutdownHook(
+ final Thread shutdownHook, final String serviceName, final Logger logger) {
+
+ // Do not run if this is invoked by the shutdown hook itself
+ if (shutdownHook == null || shutdownHook == Thread.currentThread()) {
+ return;
+ }
+
+ checkNotNull(logger);
+
+ try {
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
+ } catch (IllegalStateException e) {
+ // race, JVM is in shutdown already, we can safely ignore this
+ logger.debug(
+ "Unable to remove shutdown hook for {}, shutdown already in progress",
+ serviceName,
+ e);
+ } catch (Throwable t) {
+ logger.warn("Exception while un-registering {}'s shutdown hook.", serviceName, t);
+ }
+ }
+
+ private ShutdownHookUtil() {
+ throw new AssertionError();
+ }
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/SignalHandler.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/SignalHandler.java
new file mode 100644
index 00000000..938a2773
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/SignalHandler.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import org.slf4j.Logger;
+import sun.misc.Signal;
+
+/**
+ * This signal handler / signal logger is based on Apache Hadoop's
+ * org.apache.hadoop.util.SignalLogger.
+ *
+ *
This class is copied from Apache Flink (org.apache.flink.runtime.util.SignalHandler).
+ */
+public class SignalHandler {
+
+ private static boolean registered = false;
+
+ /** Our signal handler. */
+ private static class Handler implements sun.misc.SignalHandler {
+
+ private final Logger log;
+ private final sun.misc.SignalHandler prevHandler;
+
+ Handler(String name, Logger log) {
+ this.log = log;
+ prevHandler = Signal.handle(new Signal(name), this);
+ }
+
+ /**
+ * Handle an incoming signal.
+ *
+ * @param signal The incoming signal
+ */
+ @Override
+ public void handle(Signal signal) {
+ log.info(
+ "RECEIVED SIGNAL {}: SIG{}. Shutting down as requested.",
+ signal.getNumber(),
+ signal.getName());
+ prevHandler.handle(signal);
+ }
+ }
+
+ /**
+ * Register some signal handlers.
+ *
+ * @param log The slf4j logger
+ */
+ public static void register(final Logger log) {
+ synchronized (SignalHandler.class) {
+ if (registered) {
+ return;
+ }
+ registered = true;
+
+ final String[] signals =
+ OperatingSystem.isWindows()
+ ? new String[] {"TERM", "INT"}
+ : new String[] {"TERM", "HUP", "INT"};
+
+ StringBuilder bld = new StringBuilder();
+ bld.append("Registered UNIX signal handlers for [");
+
+ String separator = "";
+ for (String signalName : signals) {
+ try {
+ new Handler(signalName, log);
+ bld.append(separator);
+ bld.append(signalName);
+ separator = ", ";
+ } catch (Exception e) {
+ log.info("Error while registering signal handler", e);
+ }
+ }
+ bld.append("]");
+ log.info(bld.toString());
+ }
+ }
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/SingleThreadExecutorValidator.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/SingleThreadExecutorValidator.java
new file mode 100644
index 00000000..21867061
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/SingleThreadExecutorValidator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static com.alibaba.flink.shuffle.common.utils.CommonUtils.checkNotNull;
+
+/**
+ * A tool class to ensure the current method is running in the thread of a specified single thread
+ * executor.
+ */
+public class SingleThreadExecutorValidator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SingleThreadExecutorValidator.class);
+
+ private final Thread targetThread;
+
+ public SingleThreadExecutorValidator(Executor executor) {
+ CompletableFuture targetThreadFuture = new CompletableFuture<>();
+ executor.execute(() -> targetThreadFuture.complete(Thread.currentThread()));
+ targetThread = checkNotNull(targetThreadFuture.join());
+ }
+
+ public SingleThreadExecutorValidator(Thread targetThread) {
+ this.targetThread = targetThread;
+ }
+
+ public void assertRunningInTargetThread() {
+ if (Thread.currentThread() != targetThread) {
+ RuntimeException exception =
+ new RuntimeException(
+ "Expected running in "
+ + targetThread
+ + ", but running in "
+ + Thread.currentThread());
+ LOG.warn("Validate Failed", exception);
+ throw exception;
+ }
+ }
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/StringUtils.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/StringUtils.java
new file mode 100644
index 00000000..594219c4
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/StringUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/** Utilities to handle {@link String} operations. */
+public class StringUtils {
+
+ public static final Charset UTF_8 = StandardCharsets.UTF_8;
+
+ public static String bytesToString(byte[] inputBytes) {
+ return new String(inputBytes, UTF_8);
+ }
+
+ public static byte[] stringToBytes(String inputString) {
+ return inputString.getBytes(UTF_8);
+ }
+
+ /**
+ * Checks if the string is null, empty, or contains only whitespace characters. A whitespace
+ * character is defined via {@link Character#isWhitespace(char)}.
+ */
+ public static boolean isNullOrWhitespaceOnly(String string) {
+ if (string == null || string.length() == 0) {
+ return true;
+ }
+
+ final int len = string.length();
+ for (int i = 0; i < len; i++) {
+ if (!Character.isWhitespace(string.charAt(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/TimeUtils.java b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/TimeUtils.java
new file mode 100644
index 00000000..cf85b3e7
--- /dev/null
+++ b/shuffle-common/src/main/java/com/alibaba/flink/shuffle/common/utils/TimeUtils.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Collection of utilities about time intervals.
+ *
+ * This class is copied from Apache Flink (org.apache.flink.util.TimeUtils).
+ */
+public class TimeUtils {
+
+ private static final Map LABEL_TO_UNIT_MAP =
+ Collections.unmodifiableMap(initMap());
+
+ /**
+ * Parse the given string to a java {@link Duration}. The string is in format "{length
+ * value}{time unit label}", e.g. "123ms", "321 s". If no time unit label is specified, it will
+ * be considered as milliseconds.
+ *
+ * Supported time unit labels are:
+ *
+ *
+ * - DAYS: "d", "day"
+ *
- HOURS: "h", "hour"
+ *
- MINUTES: "min", "minute"
+ *
- SECONDS: "s", "sec", "second"
+ *
- MILLISECONDS: "ms", "milli", "millisecond"
+ *
- MICROSECONDS: "µs", "micro", "microsecond"
+ *
- NANOSECONDS: "ns", "nano", "nanosecond"
+ *
+ *
+ * @param text string to parse.
+ */
+ public static Duration parseDuration(String text) {
+ CommonUtils.checkNotNull(text);
+
+ final String trimmed = text.trim();
+ CommonUtils.checkArgument(
+ !trimmed.isEmpty(), "argument is an empty- or whitespace-only string");
+
+ final int len = trimmed.length();
+ int pos = 0;
+
+ char current;
+ while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
+ pos++;
+ }
+
+ final String number = trimmed.substring(0, pos);
+ final String unitLabel = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+ if (number.isEmpty()) {
+ throw new NumberFormatException("text does not start with a number");
+ }
+
+ final long value;
+ try {
+ value = Long.parseLong(number); // this throws a NumberFormatException on overflow
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "The value '"
+ + number
+ + "' cannot be re represented as 64bit number (numeric overflow).");
+ }
+
+ if (unitLabel.isEmpty()) {
+ return Duration.of(value, ChronoUnit.MILLIS);
+ }
+
+ ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel);
+ if (unit != null) {
+ return Duration.of(value, unit);
+ } else {
+ throw new IllegalArgumentException(
+ "Time interval unit label '"
+ + unitLabel
+ + "' does not match any of the recognized units: "
+ + TimeUnit.getAllUnits());
+ }
+ }
+
+ private static Map initMap() {
+ Map labelToUnit = new HashMap<>();
+ for (TimeUnit timeUnit : TimeUnit.values()) {
+ for (String label : timeUnit.getLabels()) {
+ labelToUnit.put(label, timeUnit.getUnit());
+ }
+ }
+ return labelToUnit;
+ }
+
+ /** Enum which defines time unit, mostly used to parse value from configuration file. */
+ private enum TimeUnit {
+ DAYS(ChronoUnit.DAYS, singular("d"), plural("day")),
+ HOURS(ChronoUnit.HOURS, singular("h"), plural("hour")),
+ MINUTES(ChronoUnit.MINUTES, singular("min"), plural("minute")),
+ SECONDS(ChronoUnit.SECONDS, singular("s"), plural("sec"), plural("second")),
+ MILLISECONDS(ChronoUnit.MILLIS, singular("ms"), plural("milli"), plural("millisecond")),
+ MICROSECONDS(ChronoUnit.MICROS, singular("µs"), plural("micro"), plural("microsecond")),
+ NANOSECONDS(ChronoUnit.NANOS, singular("ns"), plural("nano"), plural("nanosecond"));
+
+ private static final String PLURAL_SUFFIX = "s";
+
+ private final List labels;
+
+ private final ChronoUnit unit;
+
+ TimeUnit(ChronoUnit unit, String[]... labels) {
+ this.unit = unit;
+ this.labels =
+ Arrays.stream(labels)
+ .flatMap(ls -> Arrays.stream(ls))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * @param label the original label
+ * @return the singular format of the original label
+ */
+ private static String[] singular(String label) {
+ return new String[] {label};
+ }
+
+ /**
+ * @param label the original label
+ * @return both the singular format and plural format of the original label
+ */
+ private static String[] plural(String label) {
+ return new String[] {label, label + PLURAL_SUFFIX};
+ }
+
+ public List getLabels() {
+ return labels;
+ }
+
+ public ChronoUnit getUnit() {
+ return unit;
+ }
+
+ public static String getAllUnits() {
+ return Arrays.stream(TimeUnit.values())
+ .map(TimeUnit::createTimeUnitString)
+ .collect(Collectors.joining(", "));
+ }
+
+ private static String createTimeUnitString(TimeUnit timeUnit) {
+ return timeUnit.name() + ": (" + String.join(" | ", timeUnit.getLabels()) + ")";
+ }
+ }
+}
diff --git a/shuffle-common/src/test/java/com/alibaba/flink/shuffle/common/config/ConfigurationTest.java b/shuffle-common/src/test/java/com/alibaba/flink/shuffle/common/config/ConfigurationTest.java
new file mode 100644
index 00000000..084939ed
--- /dev/null
+++ b/shuffle-common/src/test/java/com/alibaba/flink/shuffle/common/config/ConfigurationTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.config;
+
+import com.alibaba.flink.shuffle.common.exception.ConfigurationException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+/** Tests for {@link Configuration}. */
+public class ConfigurationTest {
+
+ private static final String KEY_1 = "remote-shuffle.test.key1";
+
+ private static final String KEY_2 = "remote-shuffle.test.key2";
+
+ private static final String KEY_3 = "remote-shuffle.test.key3";
+
+ @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test
+ public void testGetBoolean() {
+ ConfigOption option1 = new ConfigOption(KEY_1).defaultValue(false);
+ ConfigOption option2 = new ConfigOption(KEY_2).defaultValue(false);
+ ConfigOption option3 = new ConfigOption(KEY_3).defaultValue(false);
+
+ Properties properties = new Properties();
+ properties.put(KEY_1, "true");
+ properties.put(KEY_2, "illegal");
+
+ Configuration configuration = new Configuration(properties);
+
+ assertEquals(true, configuration.getBoolean(KEY_1));
+ assertEquals(true, configuration.getBoolean(KEY_1, false));
+ assertEquals(true, configuration.getBoolean(option1));
+ assertEquals(true, configuration.getBoolean(option1, false));
+
+ checkGetIllegalValue(() -> configuration.getBoolean(KEY_2));
+ checkGetIllegalValue(() -> configuration.getBoolean(KEY_2, false));
+ checkGetIllegalValue(() -> configuration.getBoolean(option2));
+ checkGetIllegalValue(() -> configuration.getBoolean(option2, false));
+
+ assertNull(configuration.getBoolean(KEY_3));
+ assertEquals(false, configuration.getBoolean(KEY_3, false));
+ assertEquals(true, configuration.getBoolean(KEY_3, true));
+ assertEquals(option3.defaultValue(), configuration.getBoolean(option3));
+ assertEquals(false, configuration.getBoolean(option3, false));
+ assertEquals(true, configuration.getBoolean(option3, true));
+ }
+
+ @Test
+ public void testGetByte() {
+ Byte defaultValue = 'X';
+ ConfigOption option1 = new ConfigOption(KEY_1).defaultValue(defaultValue);
+ ConfigOption option2 = new ConfigOption(KEY_2).defaultValue(defaultValue);
+ ConfigOption option3 = new ConfigOption(KEY_3).defaultValue(defaultValue);
+
+ Byte value1 = 'O';
+ Properties properties = new Properties();
+ properties.put(KEY_1, value1.toString());
+ properties.put(KEY_2, "illegal");
+
+ Configuration configuration = new Configuration(properties);
+
+ assertEquals(value1, configuration.getByte(KEY_1));
+ assertEquals(value1, configuration.getByte(KEY_1, defaultValue));
+ assertEquals(value1, configuration.getByte(option1));
+ assertEquals(value1, configuration.getByte(option1, defaultValue));
+
+ checkGetIllegalValue(() -> configuration.getByte(KEY_2));
+ checkGetIllegalValue(() -> configuration.getByte(KEY_2, defaultValue));
+ checkGetIllegalValue(() -> configuration.getByte(option2));
+ checkGetIllegalValue(() -> configuration.getByte(option2, defaultValue));
+
+ assertNull(configuration.getByte(KEY_3));
+ assertEquals(defaultValue, configuration.getByte(KEY_3, defaultValue));
+ assertEquals(option3.defaultValue(), configuration.getByte(option3));
+ assertEquals(value1, configuration.getByte(option3, value1));
+ }
+
+ @Test
+ public void testGetShort() {
+ Short defaultValue = 1;
+ ConfigOption option1 = new ConfigOption(KEY_1).defaultValue(defaultValue);
+ ConfigOption option2 = new ConfigOption(KEY_2).defaultValue(defaultValue);
+ ConfigOption option3 = new ConfigOption(KEY_3).defaultValue(defaultValue);
+
+ Short value1 = 1024;
+ Properties properties = new Properties();
+ properties.put(KEY_1, value1.toString());
+ properties.put(KEY_2, "illegal");
+
+ Configuration configuration = new Configuration(properties);
+
+ assertEquals(value1, configuration.getShort(KEY_1));
+ assertEquals(value1, configuration.getShort(KEY_1, defaultValue));
+ assertEquals(value1, configuration.getShort(option1));
+ assertEquals(value1, configuration.getShort(option1, defaultValue));
+
+ checkGetIllegalValue(() -> configuration.getShort(KEY_2));
+ checkGetIllegalValue(() -> configuration.getShort(KEY_2, defaultValue));
+ checkGetIllegalValue(() -> configuration.getShort(option2));
+ checkGetIllegalValue(() -> configuration.getShort(option2, defaultValue));
+
+ assertNull(configuration.getShort(KEY_3));
+ assertEquals(defaultValue, configuration.getShort(KEY_3, defaultValue));
+ assertEquals(option3.defaultValue(), configuration.getShort(option3));
+ assertEquals(value1, configuration.getShort(option3, value1));
+ }
+
+ @Test
+ public void testGetInteger() {
+ Integer defaultValue = 1;
+ ConfigOption option1 = new ConfigOption(KEY_1).defaultValue(defaultValue);
+ ConfigOption option2 = new ConfigOption(KEY_2).defaultValue(defaultValue);
+ ConfigOption option3 = new ConfigOption(KEY_3).defaultValue(defaultValue);
+
+ Integer value1 = 1024;
+ Properties properties = new Properties();
+ properties.put(KEY_1, value1.toString());
+ properties.put(KEY_2, "illegal");
+
+ Configuration configuration = new Configuration(properties);
+
+ assertEquals(value1, configuration.getInteger(KEY_1));
+ assertEquals(value1, configuration.getInteger(KEY_1, defaultValue));
+ assertEquals(value1, configuration.getInteger(option1));
+ assertEquals(value1, configuration.getInteger(option1, defaultValue));
+
+ checkGetIllegalValue(() -> configuration.getInteger(KEY_2));
+ checkGetIllegalValue(() -> configuration.getInteger(KEY_2, defaultValue));
+ checkGetIllegalValue(() -> configuration.getInteger(option2));
+ checkGetIllegalValue(() -> configuration.getInteger(option2, defaultValue));
+
+ assertNull(configuration.getInteger(KEY_3));
+ assertEquals(defaultValue, configuration.getInteger(KEY_3, defaultValue));
+ assertEquals(option3.defaultValue(), configuration.getInteger(option3));
+ assertEquals(value1, configuration.getInteger(option3, value1));
+ }
+
+ @Test
+ public void testGetLong() {
+ Long defaultValue = 1L;
+ ConfigOption option1 = new ConfigOption(KEY_1).defaultValue(defaultValue);
+ ConfigOption option2 = new ConfigOption(KEY_2).defaultValue(defaultValue);
+ ConfigOption option3 = new ConfigOption(KEY_3).defaultValue(defaultValue);
+
+ Long value1 = 1024L;
+ Properties properties = new Properties();
+ properties.put(KEY_1, value1.toString());
+ properties.put(KEY_2, "illegal");
+
+ Configuration configuration = new Configuration(properties);
+
+ assertEquals(value1, configuration.getLong(KEY_1));
+ assertEquals(value1, configuration.getLong(KEY_1, defaultValue));
+ assertEquals(value1, configuration.getLong(option1));
+ assertEquals(value1, configuration.getLong(option1, defaultValue));
+
+ checkGetIllegalValue(() -> configuration.getLong(KEY_2));
+ checkGetIllegalValue(() -> configuration.getLong(KEY_2, defaultValue));
+ checkGetIllegalValue(() -> configuration.getLong(option2));
+ checkGetIllegalValue(() -> configuration.getLong(option2, defaultValue));
+
+ assertNull(configuration.getLong(KEY_3));
+ assertEquals(defaultValue, configuration.getLong(KEY_3, defaultValue));
+ assertEquals(option3.defaultValue(), configuration.getLong(option3));
+ assertEquals(value1, configuration.getLong(option3, value1));
+ }
+
+ @Test
+ public void testGetFloat() {
+ Float defaultValue = 1.0F;
+ ConfigOption option1 = new ConfigOption(KEY_1).defaultValue(defaultValue);
+ ConfigOption option2 = new ConfigOption(KEY_2).defaultValue(defaultValue);
+ ConfigOption option3 = new ConfigOption(KEY_3).defaultValue(defaultValue);
+
+ Float value1 = 1024.0F;
+ Properties properties = new Properties();
+ properties.put(KEY_1, value1.toString());
+ properties.put(KEY_2, "illegal");
+
+ Configuration configuration = new Configuration(properties);
+
+ assertEquals(value1, configuration.getFloat(KEY_1));
+ assertEquals(value1, configuration.getFloat(KEY_1, defaultValue));
+ assertEquals(value1, configuration.getFloat(option1));
+ assertEquals(value1, configuration.getFloat(option1, defaultValue));
+
+ checkGetIllegalValue(() -> configuration.getFloat(KEY_2));
+ checkGetIllegalValue(() -> configuration.getFloat(KEY_2, defaultValue));
+ checkGetIllegalValue(() -> configuration.getFloat(option2));
+ checkGetIllegalValue(() -> configuration.getFloat(option2, defaultValue));
+
+ assertNull(configuration.getFloat(KEY_3));
+ assertEquals(defaultValue, configuration.getFloat(KEY_3, defaultValue));
+ assertEquals(option3.defaultValue(), configuration.getFloat(option3));
+ assertEquals(value1, configuration.getFloat(option3, value1));
+ }
+
+ @Test
+ public void testGetDouble() {
+ Double defaultValue = 1.0;
+ ConfigOption option1 = new ConfigOption(KEY_1).defaultValue(defaultValue);
+ ConfigOption option2 = new ConfigOption(KEY_2).defaultValue(defaultValue);
+ ConfigOption option3 = new ConfigOption(KEY_3).defaultValue(defaultValue);
+
+ Double value1 = 1024.0;
+ Properties properties = new Properties();
+ properties.put(KEY_1, value1.toString());
+ properties.put(KEY_2, "illegal");
+
+ Configuration configuration = new Configuration(properties);
+
+ assertEquals(value1, configuration.getDouble(KEY_1));
+ assertEquals(value1, configuration.getDouble(KEY_1, defaultValue));
+ assertEquals(value1, configuration.getDouble(option1));
+ assertEquals(value1, configuration.getDouble(option1, defaultValue));
+
+ checkGetIllegalValue(() -> configuration.getDouble(KEY_2));
+ checkGetIllegalValue(() -> configuration.getDouble(KEY_2, defaultValue));
+ checkGetIllegalValue(() -> configuration.getDouble(option2));
+ checkGetIllegalValue(() -> configuration.getDouble(option2, defaultValue));
+
+ assertNull(configuration.getDouble(KEY_3));
+ assertEquals(defaultValue, configuration.getDouble(KEY_3, defaultValue));
+ assertEquals(option3.defaultValue(), configuration.getDouble(option3));
+ assertEquals(value1, configuration.getDouble(option3, value1));
+ }
+
+ @Test
+ public void testGetString() {
+ String defaultValue = "hello";
+ ConfigOption option1 = new ConfigOption(KEY_1).defaultValue(defaultValue);
+ ConfigOption option2 = new ConfigOption(KEY_2).defaultValue(defaultValue);
+
+ String value1 = "world";
+ Properties properties = new Properties();
+ properties.put(KEY_1, value1);
+
+ Configuration configuration = new Configuration(properties);
+
+ assertEquals(value1, configuration.getString(KEY_1));
+ assertEquals(value1, configuration.getString(KEY_1, defaultValue));
+ assertEquals(value1, configuration.getString(option1));
+ assertEquals(value1, configuration.getString(option1, defaultValue));
+
+ assertNull(configuration.getString(KEY_3));
+ assertEquals(defaultValue, configuration.getString(KEY_3, defaultValue));
+ assertEquals(option2.defaultValue(), configuration.getString(option2));
+ assertEquals(value1, configuration.getString(option2, value1));
+ }
+
+ @Test
+ public void testLoadConfigurationFromFile() throws Exception {
+ File confFile = temporaryFolder.newFile(Configuration.REMOTE_SHUFFLE_CONF_FILENAME);
+
+ String value1 = "value1";
+ String value2 = "value2";
+ String value3 = "value3";
+
+ try (FileWriter fileWriter = new FileWriter(confFile)) {
+ fileWriter.write(KEY_1 + ": " + value1 + "\n");
+ fileWriter.write(KEY_2 + ": " + value2 + "\n");
+ fileWriter.write("#" + KEY_3 + ": " + value3 + "\n");
+ }
+
+ Configuration configuration = new Configuration(confFile.getParent());
+ assertEquals(value1, configuration.getString(KEY_1));
+ assertEquals(value2, configuration.getString(KEY_2));
+ assertNull(configuration.getString(KEY_3));
+ }
+
+ @Test
+ public void testDynamicConfiguration() throws Exception {
+ File confFile = temporaryFolder.newFile(Configuration.REMOTE_SHUFFLE_CONF_FILENAME);
+
+ String value1 = "value1";
+ String value2 = "value2";
+ String value3 = "value3";
+
+ try (FileWriter fileWriter = new FileWriter(confFile)) {
+ fileWriter.write(KEY_1 + ": " + value1 + "\n");
+ fileWriter.write(KEY_2 + ": " + value2 + "\n");
+ }
+
+ Properties dynamicConfiguration = new Properties();
+ dynamicConfiguration.setProperty(KEY_1, value3);
+
+ Configuration configuration = new Configuration(confFile.getParent(), dynamicConfiguration);
+ assertEquals(value3, configuration.getString(KEY_1));
+ assertEquals(value2, configuration.getString(KEY_2));
+ }
+
+ private void checkGetIllegalValue(Runnable runnable) {
+ try {
+ runnable.run();
+ } catch (ConfigurationException ignored) {
+ // expected
+ return;
+ }
+ fail("Should throw IllegalArgumentException.");
+ }
+}
diff --git a/shuffle-common/src/test/java/com/alibaba/flink/shuffle/common/utils/CommonUtilsTest.java b/shuffle-common/src/test/java/com/alibaba/flink/shuffle/common/utils/CommonUtilsTest.java
new file mode 100644
index 00000000..52111543
--- /dev/null
+++ b/shuffle-common/src/test/java/com/alibaba/flink/shuffle/common/utils/CommonUtilsTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests for utilities in {@link CommonUtils}. */
+public class CommonUtilsTest {
+
+ @Test
+ public void testConcatAndSplitByteArrays() {
+ byte[] first = new byte[] {1, 2, 3, 4};
+ byte[] second = new byte[] {1, 2, 3};
+
+ byte[] concat = CommonUtils.concatByteArrays(first, second);
+ assertEquals(15, concat.length);
+
+ List split = CommonUtils.splitByteArrays(concat);
+ assertEquals(2, split.size());
+ assertArrayEquals(first, split.get(0));
+ assertArrayEquals(second, split.get(1));
+ }
+
+ @Test
+ public void testConvertBetweenHexAndByteArray() {
+ byte[] bytes = CommonUtils.randomBytes(16);
+ String hex = CommonUtils.bytesToHexString(bytes);
+ byte[] decoded = CommonUtils.hexStringToBytes(hex);
+ assertArrayEquals(bytes, decoded);
+ }
+}
diff --git a/shuffle-common/src/test/java/com/alibaba/flink/shuffle/common/utils/ExceptionUtilsTest.java b/shuffle-common/src/test/java/com/alibaba/flink/shuffle/common/utils/ExceptionUtilsTest.java
new file mode 100644
index 00000000..3caa57e5
--- /dev/null
+++ b/shuffle-common/src/test/java/com/alibaba/flink/shuffle/common/utils/ExceptionUtilsTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests for utilities in {@link CommonUtils}. */
+public class ExceptionUtilsTest {
+
+ @Test
+ public void testSummaryErrorMessageStack() {
+ Throwable t = new Throwable("msg");
+ assertEquals("[java.lang.Throwable: msg]", ExceptionUtils.summaryErrorMessageStack(t));
+
+ t = new Throwable();
+ assertEquals("[java.lang.Throwable: null]", ExceptionUtils.summaryErrorMessageStack(t));
+
+ t = new Throwable("msga", new Throwable0("msgb"));
+ assertEquals(
+ "[java.lang.Throwable: msga] -> [com.alibaba.flink.shuffle.common.utils.ExceptionUtilsTest$Throwable0: msgb]",
+ ExceptionUtils.summaryErrorMessageStack(t));
+
+ t = new Throwable0("msga", new Throwable());
+ assertEquals(
+ "[com.alibaba.flink.shuffle.common.utils.ExceptionUtilsTest$Throwable0: msga] -> [java.lang.Throwable: null]",
+ ExceptionUtils.summaryErrorMessageStack(t));
+
+ t = new Throwable0("msga", new Throwable(null, new Throwable0("msgb")));
+ assertEquals(
+ "[com.alibaba.flink.shuffle.common.utils.ExceptionUtilsTest$Throwable0: msga] -> [java.lang.Throwable: null] -> [com.alibaba.flink.shuffle.common.utils.ExceptionUtilsTest$Throwable0: msgb]",
+ ExceptionUtils.summaryErrorMessageStack(t));
+ }
+
+ private class Throwable0 extends Throwable {
+ Throwable0(String msg) {
+ super(msg);
+ }
+
+ Throwable0(String msg, Throwable t) {
+ super(msg, t);
+ }
+ }
+}
diff --git a/shuffle-common/src/test/java/com/alibaba/flink/shuffle/common/utils/FatalErrorsExitUtilsTest.java b/shuffle-common/src/test/java/com/alibaba/flink/shuffle/common/utils/FatalErrorsExitUtilsTest.java
new file mode 100644
index 00000000..5d99d866
--- /dev/null
+++ b/shuffle-common/src/test/java/com/alibaba/flink/shuffle/common/utils/FatalErrorsExitUtilsTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link FatalErrorExitUtils}. */
+public class FatalErrorsExitUtilsTest {
+
+ @Test
+ public void testNeedStopProcess() {
+ FatalErrorExitUtils.setNeedStopProcess(false);
+ FatalErrorExitUtils.exitProcessIfNeeded(-1, new IOException("ignore"));
+ assertFalse(FatalErrorExitUtils.isNeedStopProcess());
+
+ FatalErrorExitUtils.setNeedStopProcess(true);
+ assertTrue(FatalErrorExitUtils.isNeedStopProcess());
+ }
+}
diff --git a/shuffle-common/src/test/java/com/alibaba/flink/shuffle/common/utils/SingleThreadExecutorValidatorTest.java b/shuffle-common/src/test/java/com/alibaba/flink/shuffle/common/utils/SingleThreadExecutorValidatorTest.java
new file mode 100644
index 00000000..0de1e100
--- /dev/null
+++ b/shuffle-common/src/test/java/com/alibaba/flink/shuffle/common/utils/SingleThreadExecutorValidatorTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.common.utils;
+
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.fail;
+
+/** Tests the {@link SingleThreadExecutorValidator}. */
+public class SingleThreadExecutorValidatorTest {
+
+ @Test
+ public void testRunningInSameThread() throws ExecutionException, InterruptedException {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ SingleThreadExecutorValidator validator = new SingleThreadExecutorValidator(executor);
+
+ // The following call should succeed.
+ executor.submit(validator::assertRunningInTargetThread).get();
+
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testRunningInDifferentThread() {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ SingleThreadExecutorValidator validator = new SingleThreadExecutorValidator(executor);
+
+ try {
+ validator.assertRunningInTargetThread();
+ fail("The check should failed");
+ } catch (RuntimeException e) {
+ // Expected exception
+ }
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+}
diff --git a/shuffle-common/src/test/resources/log4j2-test.properties b/shuffle-common/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000..d7fcb327
--- /dev/null
+++ b/shuffle-common/src/test/resources/log4j2-test.properties
@@ -0,0 +1,26 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref=TestLogger
+appender.testlogger.name=TestLogger
+appender.testlogger.type=CONSOLE
+appender.testlogger.target=SYSTEM_ERR
+appender.testlogger.layout.type=PatternLayout
+appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/shuffle-coordinator/pom.xml b/shuffle-coordinator/pom.xml
new file mode 100644
index 00000000..7404c9aa
--- /dev/null
+++ b/shuffle-coordinator/pom.xml
@@ -0,0 +1,172 @@
+
+
+
+
+ com.alibaba.flink.shuffle
+ flink-shuffle-parent
+ 1.0-SNAPSHOT
+
+ 4.0.0
+
+ shuffle-coordinator
+
+
+ 2.21.0
+
+
+
+
+ com.alibaba.flink.shuffle
+ shuffle-rpc
+ ${project.version}
+
+
+
+ com.alibaba.flink.shuffle
+ shuffle-core
+ ${project.version}
+
+
+
+ com.alibaba.flink.shuffle
+ shuffle-common
+ ${project.version}
+
+
+
+ com.alibaba.flink.shuffle
+ shuffle-storage
+ ${project.version}
+
+
+
+ com.alibaba.flink.shuffle
+ shuffle-transfer
+ ${project.version}
+
+
+
+ com.alibaba.flink.shuffle
+ shuffle-metrics
+ ${project.version}
+
+
+
+ org.apache.flink
+ flink-shaded-zookeeper-3
+ ${zookeeper.version}
+
+
+
+ org.apache.commons
+ commons-lang3
+ 3.3.2
+
+
+
+ org.apache.flink
+ flink-rpc-core
+ ${flink.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-shaded-netty
+ 4.1.49.Final-${flink.shaded.version}
+ provided
+
+
+
+ com.alibaba.flink.shuffle
+ shuffle-core
+ ${project.version}
+ test-jar
+ test
+
+
+
+ org.apache.curator
+ curator-test
+ ${curator.version}
+ test
+
+
+
+ log4j
+ log4j
+
+
+
+
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ jar
+ test
+
+
+
+
+
+
+ src/main/resources-filtered
+ true
+
+
+
+
+
+ pl.project13.maven
+ git-commit-id-plugin
+
+
+ get-the-git-infos
+ validate
+
+ revision
+
+
+
+
+ ${project.basedir}/../.git
+ false
+ false
+ false
+
+
+
+ true
+
+
+
+
+
+
diff --git a/shuffle-coordinator/src/main/java/com/alibaba/flink/shuffle/client/ShuffleManagerClient.java b/shuffle-coordinator/src/main/java/com/alibaba/flink/shuffle/client/ShuffleManagerClient.java
new file mode 100644
index 00000000..c96cc95f
--- /dev/null
+++ b/shuffle-coordinator/src/main/java/com/alibaba/flink/shuffle/client/ShuffleManagerClient.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.flink.shuffle.client;
+
+import com.alibaba.flink.shuffle.coordinator.manager.JobDataPartitionDistribution;
+import com.alibaba.flink.shuffle.coordinator.manager.ShuffleResource;
+import com.alibaba.flink.shuffle.coordinator.worker.ShuffleWorkerMetrics;
+import com.alibaba.flink.shuffle.core.ids.DataSetID;
+import com.alibaba.flink.shuffle.core.ids.InstanceID;
+import com.alibaba.flink.shuffle.core.ids.JobID;
+import com.alibaba.flink.shuffle.core.ids.MapPartitionID;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/** Client to interact with ShuffleManager to request and release shuffle resource. */
+public interface ShuffleManagerClient extends AutoCloseable {
+
+ void start();
+
+ void synchronizeWorkerStatus(Set initialWorkers) throws Exception;
+
+ CompletableFuture requestShuffleResource(
+ DataSetID dataSetId,
+ MapPartitionID mapPartitionId,
+ int numberOfSubpartitions,
+ String dataPartitionFactoryName);
+
+ void releaseShuffleResource(DataSetID dataSetId, MapPartitionID mapPartitionId);
+
+ CompletableFuture getNumberOfRegisteredWorkers();
+
+ CompletableFuture