From 2d3b0bdff3d658970a747cf4eea6a6f926a5db24 Mon Sep 17 00:00:00 2001 From: lklhdu <49940747+lklhdu@users.noreply.github.com> Date: Mon, 11 Sep 2023 10:57:57 +0800 Subject: [PATCH] =?UTF-8?q?[AMORO-1931][Flink]=20Logstore=20supports=20'gr?= =?UTF-8?q?oup-offsets'=20and=20'specific-o=E2=80=A6=20(#1949)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit [AMORO-1931][Flink] Logstore supports 'group-offsets' and 'specific-offsets' Startup mode (#1932) (cherry picked from commit 3e9f709ac78cf827b1598637f4d174390eedf3a8) --- .../log/kafka/LogKafkaSourceBuilder.java | 85 ++++++++++++++++--- .../table/descriptors/ArcticValidator.java | 21 +++-- .../log/kafka/LogKafkaSourceBuilder.java | 85 ++++++++++++++++--- .../table/descriptors/ArcticValidator.java | 21 +++-- .../log/kafka/LogKafkaSourceBuilder.java | 85 ++++++++++++++++--- .../table/descriptors/ArcticValidator.java | 9 ++ 6 files changed, 264 insertions(+), 42 deletions(-) diff --git a/flink/v1.12/flink/src/main/java/com/netease/arctic/flink/read/source/log/kafka/LogKafkaSourceBuilder.java b/flink/v1.12/flink/src/main/java/com/netease/arctic/flink/read/source/log/kafka/LogKafkaSourceBuilder.java index f925c06e41..c9504d9bee 100644 --- a/flink/v1.12/flink/src/main/java/com/netease/arctic/flink/read/source/log/kafka/LogKafkaSourceBuilder.java +++ b/flink/v1.12/flink/src/main/java/com/netease/arctic/flink/read/source/log/kafka/LogKafkaSourceBuilder.java @@ -38,9 +38,9 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Properties; import java.util.Random; import java.util.Set; @@ -48,8 +48,11 @@ import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_EARLIEST; +import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_GROUP_OFFSETS; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_LATEST; +import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_SPECIFIC_OFFSETS; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_TIMESTAMP; +import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_SPECIFIC_OFFSETS; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_TIMESTAMP_MILLIS; import static com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil.fetchLogstorePrefixProperties; import static com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil.getLogTopic; @@ -75,6 +78,8 @@ public class LogKafkaSourceBuilder { private static final String[] REQUIRED_CONFIGS = { ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ConsumerConfig.GROUP_ID_CONFIG }; + private static final String PARTITION = "partition"; + private static final String OFFSET = "offset"; // The subscriber specifies the partitions to subscribe to. private KafkaSubscriber subscriber; // Users can specify the starting / stopping offset initializer. @@ -207,6 +212,7 @@ public LogKafkaSourceBuilder setPartitions(Set partitions) { public LogKafkaSourceBuilder setStartingOffsets( OffsetsInitializer startingOffsetsInitializer) { this.startingOffsetsInitializer = startingOffsetsInitializer; + LOG.info("Setting LogKafkaSource starting offset: {}", startingOffsetsInitializer); return this; } @@ -410,13 +416,6 @@ private void setupKafkaProperties() { private void setupStartupMode() { String startupMode = CompatiblePropertyUtil.propertyAsString(tableProperties, SCAN_STARTUP_MODE.key(), SCAN_STARTUP_MODE.defaultValue()).toLowerCase(); - long startupTimestampMillis = 0L; - if (Objects.equals(startupMode.toLowerCase(), SCAN_STARTUP_MODE_TIMESTAMP)) { - startupTimestampMillis = Long.parseLong(Preconditions.checkNotNull( - tableProperties.get(SCAN_STARTUP_TIMESTAMP_MILLIS.key()), - String.format("'%s' should be set in '%s' mode", - SCAN_STARTUP_TIMESTAMP_MILLIS.key(), SCAN_STARTUP_MODE_TIMESTAMP))); - } switch (startupMode) { case SCAN_STARTUP_MODE_EARLIEST: @@ -426,12 +425,39 @@ private void setupStartupMode() { setStartingOffsets(OffsetsInitializer.latest()); break; case SCAN_STARTUP_MODE_TIMESTAMP: + long startupTimestampMillis = Long.parseLong(Preconditions.checkNotNull( + tableProperties.get(SCAN_STARTUP_TIMESTAMP_MILLIS.key()), + String.format("'%s' should be set in '%s' mode", + SCAN_STARTUP_TIMESTAMP_MILLIS.key(), SCAN_STARTUP_MODE_TIMESTAMP))); setStartingOffsets(OffsetsInitializer.timestamp(startupTimestampMillis)); break; + case SCAN_STARTUP_MODE_GROUP_OFFSETS: + setStartingOffsets(OffsetsInitializer.committedOffsets()); + break; + case SCAN_STARTUP_MODE_SPECIFIC_OFFSETS: + Map specificOffsets = new HashMap<>(); + String specificOffsetsStrOpt = Preconditions.checkNotNull( + tableProperties.get(SCAN_STARTUP_SPECIFIC_OFFSETS.key()), + String.format("'%s' should be set in '%s' mode", + SCAN_STARTUP_SPECIFIC_OFFSETS.key(), SCAN_STARTUP_MODE_SPECIFIC_OFFSETS)); + final Map offsetMap = + parseSpecificOffsets(specificOffsetsStrOpt, SCAN_STARTUP_SPECIFIC_OFFSETS.key()); + offsetMap.forEach( + (partition, offset) -> { + final TopicPartition topicPartition = + new TopicPartition(getLogTopic(tableProperties).get(0), partition); + specificOffsets.put(topicPartition, offset); + } + ); + setStartingOffsets(OffsetsInitializer.offsets(specificOffsets)); + break; default: throw new ValidationException(String.format( - "%s only support '%s', '%s', '%s'. But input is '%s'", ArcticValidator.SCAN_STARTUP_MODE, - SCAN_STARTUP_MODE_LATEST, SCAN_STARTUP_MODE_EARLIEST, SCAN_STARTUP_MODE_TIMESTAMP, startupMode)); + "%s only support '%s', '%s', '%s', '%s', '%s'. But input is '%s'", + ArcticValidator.SCAN_STARTUP_MODE, + SCAN_STARTUP_MODE_LATEST, SCAN_STARTUP_MODE_EARLIEST, + SCAN_STARTUP_MODE_TIMESTAMP, SCAN_STARTUP_MODE_GROUP_OFFSETS, + SCAN_STARTUP_MODE_SPECIFIC_OFFSETS, startupMode)); } } @@ -504,4 +530,43 @@ private void sanityCheck() { subscriber, String.format("No topic is specified, '%s' should be set.", LOG_STORE_MESSAGE_TOPIC)); } + + public static Map parseSpecificOffsets( + String specificOffsetsStr, String optionKey) { + final Map offsetMap = new HashMap<>(); + final String[] pairs = specificOffsetsStr.split(";"); + final String validationExceptionMessage = + String.format( + "Invalid properties '%s' should follow the format " + + "'partition:0,offset:42;partition:1,offset:300', but is '%s'.", + optionKey, specificOffsetsStr); + + if (pairs.length == 0) { + throw new ValidationException(validationExceptionMessage); + } + + for (String pair : pairs) { + if (null == pair || pair.length() == 0 || !pair.contains(",")) { + throw new ValidationException(validationExceptionMessage); + } + + final String[] kv = pair.split(","); + if (kv.length != 2 || + !kv[0].startsWith(PARTITION + ':') || + !kv[1].startsWith(OFFSET + ':')) { + throw new ValidationException(validationExceptionMessage); + } + + String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1); + String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1); + try { + final Integer partition = Integer.valueOf(partitionValue); + final Long offset = Long.valueOf(offsetValue); + offsetMap.put(partition, offset); + } catch (NumberFormatException e) { + throw new ValidationException(validationExceptionMessage, e); + } + } + return offsetMap; + } } diff --git a/flink/v1.12/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java b/flink/v1.12/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java index 055bb400d9..f6023bd730 100644 --- a/flink/v1.12/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java +++ b/flink/v1.12/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java @@ -78,6 +78,8 @@ public class ArcticValidator extends ConnectorDescriptorValidator { public static final String SCAN_STARTUP_MODE_EARLIEST = "earliest"; public static final String SCAN_STARTUP_MODE_LATEST = "latest"; public static final String SCAN_STARTUP_MODE_TIMESTAMP = "timestamp"; + public static final String SCAN_STARTUP_MODE_GROUP_OFFSETS = "group-offsets"; + public static final String SCAN_STARTUP_MODE_SPECIFIC_OFFSETS = "specific-offsets"; public static final ConfigOption ARCTIC_LOG_CONSISTENCY_GUARANTEE_ENABLE = ConfigOptions.key("log-store.consistency-guarantee.enabled") @@ -135,12 +137,19 @@ public class ArcticValidator extends ConnectorDescriptorValidator { " \"timestamp\": start from user-supplied timestamp for each partition.", ARCTIC_READ_MODE, ARCTIC_READ_FILE, ARCTIC_READ_MODE, ARCTIC_READ_LOG)); - public static final ConfigOption SCAN_STARTUP_TIMESTAMP_MILLIS = - ConfigOptions.key("scan.startup.timestamp-millis") - .longType() - .noDefaultValue() - .withDescription( - "Optional timestamp used in case of \"timestamp\" startup mode"); + public static final ConfigOption SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions + .key("scan.startup.timestamp-millis") + .longType() + .noDefaultValue() + .withDescription( + "Optional timestamp used in case of \"timestamp\" startup mode"); + + public static final ConfigOption SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions + .key("scan.startup.specific-offsets") + .stringType() + .noDefaultValue() + .withDescription( + "Optional timestamp used in case of \"timestamp\" startup mode"); public static final ConfigOption SUBMIT_EMPTY_SNAPSHOTS = ConfigOptions .key("submit.empty.snapshots") diff --git a/flink/v1.14/flink/src/main/java/com/netease/arctic/flink/read/source/log/kafka/LogKafkaSourceBuilder.java b/flink/v1.14/flink/src/main/java/com/netease/arctic/flink/read/source/log/kafka/LogKafkaSourceBuilder.java index 0dd1bc8b90..3f56d0ba9e 100644 --- a/flink/v1.14/flink/src/main/java/com/netease/arctic/flink/read/source/log/kafka/LogKafkaSourceBuilder.java +++ b/flink/v1.14/flink/src/main/java/com/netease/arctic/flink/read/source/log/kafka/LogKafkaSourceBuilder.java @@ -40,9 +40,9 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Properties; import java.util.Random; import java.util.Set; @@ -50,8 +50,11 @@ import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_EARLIEST; +import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_GROUP_OFFSETS; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_LATEST; +import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_SPECIFIC_OFFSETS; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_TIMESTAMP; +import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_SPECIFIC_OFFSETS; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_TIMESTAMP_MILLIS; import static com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil.fetchLogstorePrefixProperties; import static com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil.getLogTopic; @@ -77,6 +80,8 @@ public class LogKafkaSourceBuilder { private static final String[] REQUIRED_CONFIGS = { ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ConsumerConfig.GROUP_ID_CONFIG }; + private static final String PARTITION = "partition"; + private static final String OFFSET = "offset"; // The subscriber specifies the partitions to subscribe to. private KafkaSubscriber subscriber; // Users can specify the starting / stopping offset initializer. @@ -209,6 +214,7 @@ public LogKafkaSourceBuilder setPartitions(Set partitions) { public LogKafkaSourceBuilder setStartingOffsets( OffsetsInitializer startingOffsetsInitializer) { this.startingOffsetsInitializer = startingOffsetsInitializer; + LOG.info("Setting LogKafkaSource starting offset: {}", startingOffsetsInitializer); return this; } @@ -413,13 +419,6 @@ private void setupKafkaProperties() { private void setupStartupMode() { String startupMode = CompatiblePropertyUtil.propertyAsString(tableProperties, SCAN_STARTUP_MODE.key(), SCAN_STARTUP_MODE.defaultValue()).toLowerCase(); - long startupTimestampMillis = 0L; - if (Objects.equals(startupMode.toLowerCase(), SCAN_STARTUP_MODE_TIMESTAMP)) { - startupTimestampMillis = Long.parseLong(Preconditions.checkNotNull( - tableProperties.get(SCAN_STARTUP_TIMESTAMP_MILLIS.key()), - String.format("'%s' should be set in '%s' mode", - SCAN_STARTUP_TIMESTAMP_MILLIS.key(), SCAN_STARTUP_MODE_TIMESTAMP))); - } switch (startupMode) { case SCAN_STARTUP_MODE_EARLIEST: @@ -429,12 +428,39 @@ private void setupStartupMode() { setStartingOffsets(OffsetsInitializer.latest()); break; case SCAN_STARTUP_MODE_TIMESTAMP: + long startupTimestampMillis = Long.parseLong(Preconditions.checkNotNull( + tableProperties.get(SCAN_STARTUP_TIMESTAMP_MILLIS.key()), + String.format("'%s' should be set in '%s' mode", + SCAN_STARTUP_TIMESTAMP_MILLIS.key(), SCAN_STARTUP_MODE_TIMESTAMP))); setStartingOffsets(OffsetsInitializer.timestamp(startupTimestampMillis)); break; + case SCAN_STARTUP_MODE_GROUP_OFFSETS: + setStartingOffsets(OffsetsInitializer.committedOffsets()); + break; + case SCAN_STARTUP_MODE_SPECIFIC_OFFSETS: + Map specificOffsets = new HashMap<>(); + String specificOffsetsStrOpt = Preconditions.checkNotNull( + tableProperties.get(SCAN_STARTUP_SPECIFIC_OFFSETS.key()), + String.format("'%s' should be set in '%s' mode", + SCAN_STARTUP_SPECIFIC_OFFSETS.key(), SCAN_STARTUP_MODE_SPECIFIC_OFFSETS)); + final Map offsetMap = + parseSpecificOffsets(specificOffsetsStrOpt, SCAN_STARTUP_SPECIFIC_OFFSETS.key()); + offsetMap.forEach( + (partition, offset) -> { + final TopicPartition topicPartition = + new TopicPartition(getLogTopic(tableProperties).get(0), partition); + specificOffsets.put(topicPartition, offset); + } + ); + setStartingOffsets(OffsetsInitializer.offsets(specificOffsets)); + break; default: throw new ValidationException(String.format( - "%s only support '%s', '%s', '%s'. But input is '%s'", ArcticValidator.SCAN_STARTUP_MODE, - SCAN_STARTUP_MODE_LATEST, SCAN_STARTUP_MODE_EARLIEST, SCAN_STARTUP_MODE_TIMESTAMP, startupMode)); + "%s only support '%s', '%s', '%s', '%s', '%s'. But input is '%s'", + ArcticValidator.SCAN_STARTUP_MODE, + SCAN_STARTUP_MODE_LATEST, SCAN_STARTUP_MODE_EARLIEST, + SCAN_STARTUP_MODE_TIMESTAMP, SCAN_STARTUP_MODE_GROUP_OFFSETS, + SCAN_STARTUP_MODE_SPECIFIC_OFFSETS, startupMode)); } } @@ -507,4 +533,43 @@ private void sanityCheck() { subscriber, String.format("No topic is specified, '%s' should be set.", LOG_STORE_MESSAGE_TOPIC)); } + + public static Map parseSpecificOffsets( + String specificOffsetsStr, String optionKey) { + final Map offsetMap = new HashMap<>(); + final String[] pairs = specificOffsetsStr.split(";"); + final String validationExceptionMessage = + String.format( + "Invalid properties '%s' should follow the format " + + "'partition:0,offset:42;partition:1,offset:300', but is '%s'.", + optionKey, specificOffsetsStr); + + if (pairs.length == 0) { + throw new ValidationException(validationExceptionMessage); + } + + for (String pair : pairs) { + if (null == pair || pair.length() == 0 || !pair.contains(",")) { + throw new ValidationException(validationExceptionMessage); + } + + final String[] kv = pair.split(","); + if (kv.length != 2 || + !kv[0].startsWith(PARTITION + ':') || + !kv[1].startsWith(OFFSET + ':')) { + throw new ValidationException(validationExceptionMessage); + } + + String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1); + String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1); + try { + final Integer partition = Integer.valueOf(partitionValue); + final Long offset = Long.valueOf(offsetValue); + offsetMap.put(partition, offset); + } catch (NumberFormatException e) { + throw new ValidationException(validationExceptionMessage, e); + } + } + return offsetMap; + } } diff --git a/flink/v1.14/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java b/flink/v1.14/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java index 2a1b95156a..fb36b1062d 100644 --- a/flink/v1.14/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java +++ b/flink/v1.14/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java @@ -78,6 +78,8 @@ public class ArcticValidator extends ConnectorDescriptorValidator { public static final String SCAN_STARTUP_MODE_EARLIEST = "earliest"; public static final String SCAN_STARTUP_MODE_LATEST = "latest"; public static final String SCAN_STARTUP_MODE_TIMESTAMP = "timestamp"; + public static final String SCAN_STARTUP_MODE_GROUP_OFFSETS = "group-offsets"; + public static final String SCAN_STARTUP_MODE_SPECIFIC_OFFSETS = "specific-offsets"; public static final ConfigOption ARCTIC_LOG_CONSISTENCY_GUARANTEE_ENABLE = ConfigOptions.key("log-store.consistency-guarantee.enabled") @@ -121,11 +123,11 @@ public class ArcticValidator extends ConnectorDescriptorValidator { .defaultValue(2048) .withDescription("The target number of records for Iceberg reader fetch batch."); - public static final ConfigOption SCAN_STARTUP_MODE = ConfigOptions - .key("scan.startup.mode") - .stringType() - .defaultValue(SCAN_STARTUP_MODE_LATEST) - .withDescription(String.format("Optional startup mode for arctic source, valid values are " + + public static final ConfigOption SCAN_STARTUP_MODE = + ConfigOptions.key("scan.startup.mode") + .stringType() + .defaultValue(SCAN_STARTUP_MODE_LATEST) + .withDescription(String.format("Optional startup mode for arctic source, valid values are " + "\"earliest\" or \"latest\", \"timestamp\". If %s values %s, \"earliest\":" + " read earliest table data including base and change files from" + " the current snapshot, \"latest\": read all incremental data in the change table starting from the" + @@ -133,7 +135,7 @@ public class ArcticValidator extends ConnectorDescriptorValidator { " If %s values %s, \"earliest\": start from the earliest offset possible." + " \"latest\": start from the latest offset," + " \"timestamp\": start from user-supplied timestamp for each partition.", - ARCTIC_READ_MODE, ARCTIC_READ_FILE, ARCTIC_READ_MODE, ARCTIC_READ_LOG)); + ARCTIC_READ_MODE, ARCTIC_READ_FILE, ARCTIC_READ_MODE, ARCTIC_READ_LOG)); public static final ConfigOption SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions.key("scan.startup.timestamp-millis") @@ -142,6 +144,13 @@ public class ArcticValidator extends ConnectorDescriptorValidator { .withDescription( "Optional timestamp used in case of \"timestamp\" startup mode"); + public static final ConfigOption SCAN_STARTUP_SPECIFIC_OFFSETS = + ConfigOptions.key("scan.startup.specific-offsets") + .stringType() + .noDefaultValue() + .withDescription( + "Optional timestamp used in case of \"timestamp\" startup mode"); + public static final ConfigOption SUBMIT_EMPTY_SNAPSHOTS = ConfigOptions .key("submit.empty.snapshots") .booleanType() diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/source/log/kafka/LogKafkaSourceBuilder.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/source/log/kafka/LogKafkaSourceBuilder.java index 0dd1bc8b90..3f56d0ba9e 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/source/log/kafka/LogKafkaSourceBuilder.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/source/log/kafka/LogKafkaSourceBuilder.java @@ -40,9 +40,9 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Properties; import java.util.Random; import java.util.Set; @@ -50,8 +50,11 @@ import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_EARLIEST; +import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_GROUP_OFFSETS; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_LATEST; +import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_SPECIFIC_OFFSETS; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_TIMESTAMP; +import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_SPECIFIC_OFFSETS; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_TIMESTAMP_MILLIS; import static com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil.fetchLogstorePrefixProperties; import static com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil.getLogTopic; @@ -77,6 +80,8 @@ public class LogKafkaSourceBuilder { private static final String[] REQUIRED_CONFIGS = { ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ConsumerConfig.GROUP_ID_CONFIG }; + private static final String PARTITION = "partition"; + private static final String OFFSET = "offset"; // The subscriber specifies the partitions to subscribe to. private KafkaSubscriber subscriber; // Users can specify the starting / stopping offset initializer. @@ -209,6 +214,7 @@ public LogKafkaSourceBuilder setPartitions(Set partitions) { public LogKafkaSourceBuilder setStartingOffsets( OffsetsInitializer startingOffsetsInitializer) { this.startingOffsetsInitializer = startingOffsetsInitializer; + LOG.info("Setting LogKafkaSource starting offset: {}", startingOffsetsInitializer); return this; } @@ -413,13 +419,6 @@ private void setupKafkaProperties() { private void setupStartupMode() { String startupMode = CompatiblePropertyUtil.propertyAsString(tableProperties, SCAN_STARTUP_MODE.key(), SCAN_STARTUP_MODE.defaultValue()).toLowerCase(); - long startupTimestampMillis = 0L; - if (Objects.equals(startupMode.toLowerCase(), SCAN_STARTUP_MODE_TIMESTAMP)) { - startupTimestampMillis = Long.parseLong(Preconditions.checkNotNull( - tableProperties.get(SCAN_STARTUP_TIMESTAMP_MILLIS.key()), - String.format("'%s' should be set in '%s' mode", - SCAN_STARTUP_TIMESTAMP_MILLIS.key(), SCAN_STARTUP_MODE_TIMESTAMP))); - } switch (startupMode) { case SCAN_STARTUP_MODE_EARLIEST: @@ -429,12 +428,39 @@ private void setupStartupMode() { setStartingOffsets(OffsetsInitializer.latest()); break; case SCAN_STARTUP_MODE_TIMESTAMP: + long startupTimestampMillis = Long.parseLong(Preconditions.checkNotNull( + tableProperties.get(SCAN_STARTUP_TIMESTAMP_MILLIS.key()), + String.format("'%s' should be set in '%s' mode", + SCAN_STARTUP_TIMESTAMP_MILLIS.key(), SCAN_STARTUP_MODE_TIMESTAMP))); setStartingOffsets(OffsetsInitializer.timestamp(startupTimestampMillis)); break; + case SCAN_STARTUP_MODE_GROUP_OFFSETS: + setStartingOffsets(OffsetsInitializer.committedOffsets()); + break; + case SCAN_STARTUP_MODE_SPECIFIC_OFFSETS: + Map specificOffsets = new HashMap<>(); + String specificOffsetsStrOpt = Preconditions.checkNotNull( + tableProperties.get(SCAN_STARTUP_SPECIFIC_OFFSETS.key()), + String.format("'%s' should be set in '%s' mode", + SCAN_STARTUP_SPECIFIC_OFFSETS.key(), SCAN_STARTUP_MODE_SPECIFIC_OFFSETS)); + final Map offsetMap = + parseSpecificOffsets(specificOffsetsStrOpt, SCAN_STARTUP_SPECIFIC_OFFSETS.key()); + offsetMap.forEach( + (partition, offset) -> { + final TopicPartition topicPartition = + new TopicPartition(getLogTopic(tableProperties).get(0), partition); + specificOffsets.put(topicPartition, offset); + } + ); + setStartingOffsets(OffsetsInitializer.offsets(specificOffsets)); + break; default: throw new ValidationException(String.format( - "%s only support '%s', '%s', '%s'. But input is '%s'", ArcticValidator.SCAN_STARTUP_MODE, - SCAN_STARTUP_MODE_LATEST, SCAN_STARTUP_MODE_EARLIEST, SCAN_STARTUP_MODE_TIMESTAMP, startupMode)); + "%s only support '%s', '%s', '%s', '%s', '%s'. But input is '%s'", + ArcticValidator.SCAN_STARTUP_MODE, + SCAN_STARTUP_MODE_LATEST, SCAN_STARTUP_MODE_EARLIEST, + SCAN_STARTUP_MODE_TIMESTAMP, SCAN_STARTUP_MODE_GROUP_OFFSETS, + SCAN_STARTUP_MODE_SPECIFIC_OFFSETS, startupMode)); } } @@ -507,4 +533,43 @@ private void sanityCheck() { subscriber, String.format("No topic is specified, '%s' should be set.", LOG_STORE_MESSAGE_TOPIC)); } + + public static Map parseSpecificOffsets( + String specificOffsetsStr, String optionKey) { + final Map offsetMap = new HashMap<>(); + final String[] pairs = specificOffsetsStr.split(";"); + final String validationExceptionMessage = + String.format( + "Invalid properties '%s' should follow the format " + + "'partition:0,offset:42;partition:1,offset:300', but is '%s'.", + optionKey, specificOffsetsStr); + + if (pairs.length == 0) { + throw new ValidationException(validationExceptionMessage); + } + + for (String pair : pairs) { + if (null == pair || pair.length() == 0 || !pair.contains(",")) { + throw new ValidationException(validationExceptionMessage); + } + + final String[] kv = pair.split(","); + if (kv.length != 2 || + !kv[0].startsWith(PARTITION + ':') || + !kv[1].startsWith(OFFSET + ':')) { + throw new ValidationException(validationExceptionMessage); + } + + String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1); + String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1); + try { + final Integer partition = Integer.valueOf(partitionValue); + final Long offset = Long.valueOf(offsetValue); + offsetMap.put(partition, offset); + } catch (NumberFormatException e) { + throw new ValidationException(validationExceptionMessage, e); + } + } + return offsetMap; + } } diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java index c9ffb223b7..343c8a1e3f 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java @@ -78,6 +78,8 @@ public class ArcticValidator extends ConnectorDescriptorValidator { public static final String SCAN_STARTUP_MODE_EARLIEST = "earliest"; public static final String SCAN_STARTUP_MODE_LATEST = "latest"; public static final String SCAN_STARTUP_MODE_TIMESTAMP = "timestamp"; + public static final String SCAN_STARTUP_MODE_GROUP_OFFSETS = "group-offsets"; + public static final String SCAN_STARTUP_MODE_SPECIFIC_OFFSETS = "specific-offsets"; public static final ConfigOption ARCTIC_LOG_CONSISTENCY_GUARANTEE_ENABLE = ConfigOptions.key("log-store.consistency-guarantee.enabled") @@ -142,6 +144,13 @@ public class ArcticValidator extends ConnectorDescriptorValidator { .withDescription( "Optional timestamp used in case of \"timestamp\" startup mode"); + public static final ConfigOption SCAN_STARTUP_SPECIFIC_OFFSETS = + ConfigOptions.key("scan.startup.specific-offsets") + .stringType() + .noDefaultValue() + .withDescription( + "Optional timestamp used in case of \"timestamp\" startup mode"); + public static final ConfigOption SUBMIT_EMPTY_SNAPSHOTS = ConfigOptions .key("submit.empty.snapshots") .booleanType()