Skip to content

Commit

Permalink
[AMORO-1931][Flink] Logstore supports 'group-offsets' and 'specific-o… (
Browse files Browse the repository at this point in the history
apache#1949)

[AMORO-1931][Flink] Logstore supports 'group-offsets' and 'specific-offsets' Startup mode (apache#1932)

(cherry picked from commit 3e9f709)
  • Loading branch information
lklhdu authored Sep 11, 2023
1 parent f721b9a commit 2d3b0bd
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,21 @@
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.regex.Pattern;

import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_EARLIEST;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_GROUP_OFFSETS;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_LATEST;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_SPECIFIC_OFFSETS;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_TIMESTAMP;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_SPECIFIC_OFFSETS;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_TIMESTAMP_MILLIS;
import static com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil.fetchLogstorePrefixProperties;
import static com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil.getLogTopic;
Expand All @@ -75,6 +78,8 @@ public class LogKafkaSourceBuilder {
private static final String[] REQUIRED_CONFIGS = {
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ConsumerConfig.GROUP_ID_CONFIG
};
private static final String PARTITION = "partition";
private static final String OFFSET = "offset";
// The subscriber specifies the partitions to subscribe to.
private KafkaSubscriber subscriber;
// Users can specify the starting / stopping offset initializer.
Expand Down Expand Up @@ -207,6 +212,7 @@ public LogKafkaSourceBuilder setPartitions(Set<TopicPartition> partitions) {
public LogKafkaSourceBuilder setStartingOffsets(
OffsetsInitializer startingOffsetsInitializer) {
this.startingOffsetsInitializer = startingOffsetsInitializer;
LOG.info("Setting LogKafkaSource starting offset: {}", startingOffsetsInitializer);
return this;
}

Expand Down Expand Up @@ -410,13 +416,6 @@ private void setupKafkaProperties() {
private void setupStartupMode() {
String startupMode = CompatiblePropertyUtil.propertyAsString(tableProperties, SCAN_STARTUP_MODE.key(),
SCAN_STARTUP_MODE.defaultValue()).toLowerCase();
long startupTimestampMillis = 0L;
if (Objects.equals(startupMode.toLowerCase(), SCAN_STARTUP_MODE_TIMESTAMP)) {
startupTimestampMillis = Long.parseLong(Preconditions.checkNotNull(
tableProperties.get(SCAN_STARTUP_TIMESTAMP_MILLIS.key()),
String.format("'%s' should be set in '%s' mode",
SCAN_STARTUP_TIMESTAMP_MILLIS.key(), SCAN_STARTUP_MODE_TIMESTAMP)));
}

switch (startupMode) {
case SCAN_STARTUP_MODE_EARLIEST:
Expand All @@ -426,12 +425,39 @@ private void setupStartupMode() {
setStartingOffsets(OffsetsInitializer.latest());
break;
case SCAN_STARTUP_MODE_TIMESTAMP:
long startupTimestampMillis = Long.parseLong(Preconditions.checkNotNull(
tableProperties.get(SCAN_STARTUP_TIMESTAMP_MILLIS.key()),
String.format("'%s' should be set in '%s' mode",
SCAN_STARTUP_TIMESTAMP_MILLIS.key(), SCAN_STARTUP_MODE_TIMESTAMP)));
setStartingOffsets(OffsetsInitializer.timestamp(startupTimestampMillis));
break;
case SCAN_STARTUP_MODE_GROUP_OFFSETS:
setStartingOffsets(OffsetsInitializer.committedOffsets());
break;
case SCAN_STARTUP_MODE_SPECIFIC_OFFSETS:
Map<TopicPartition, Long> specificOffsets = new HashMap<>();
String specificOffsetsStrOpt = Preconditions.checkNotNull(
tableProperties.get(SCAN_STARTUP_SPECIFIC_OFFSETS.key()),
String.format("'%s' should be set in '%s' mode",
SCAN_STARTUP_SPECIFIC_OFFSETS.key(), SCAN_STARTUP_MODE_SPECIFIC_OFFSETS));
final Map<Integer, Long> offsetMap =
parseSpecificOffsets(specificOffsetsStrOpt, SCAN_STARTUP_SPECIFIC_OFFSETS.key());
offsetMap.forEach(
(partition, offset) -> {
final TopicPartition topicPartition =
new TopicPartition(getLogTopic(tableProperties).get(0), partition);
specificOffsets.put(topicPartition, offset);
}
);
setStartingOffsets(OffsetsInitializer.offsets(specificOffsets));
break;
default:
throw new ValidationException(String.format(
"%s only support '%s', '%s', '%s'. But input is '%s'", ArcticValidator.SCAN_STARTUP_MODE,
SCAN_STARTUP_MODE_LATEST, SCAN_STARTUP_MODE_EARLIEST, SCAN_STARTUP_MODE_TIMESTAMP, startupMode));
"%s only support '%s', '%s', '%s', '%s', '%s'. But input is '%s'",
ArcticValidator.SCAN_STARTUP_MODE,
SCAN_STARTUP_MODE_LATEST, SCAN_STARTUP_MODE_EARLIEST,
SCAN_STARTUP_MODE_TIMESTAMP, SCAN_STARTUP_MODE_GROUP_OFFSETS,
SCAN_STARTUP_MODE_SPECIFIC_OFFSETS, startupMode));
}
}

Expand Down Expand Up @@ -504,4 +530,43 @@ private void sanityCheck() {
subscriber,
String.format("No topic is specified, '%s' should be set.", LOG_STORE_MESSAGE_TOPIC));
}

public static Map<Integer, Long> parseSpecificOffsets(
String specificOffsetsStr, String optionKey) {
final Map<Integer, Long> offsetMap = new HashMap<>();
final String[] pairs = specificOffsetsStr.split(";");
final String validationExceptionMessage =
String.format(
"Invalid properties '%s' should follow the format " +
"'partition:0,offset:42;partition:1,offset:300', but is '%s'.",
optionKey, specificOffsetsStr);

if (pairs.length == 0) {
throw new ValidationException(validationExceptionMessage);
}

for (String pair : pairs) {
if (null == pair || pair.length() == 0 || !pair.contains(",")) {
throw new ValidationException(validationExceptionMessage);
}

final String[] kv = pair.split(",");
if (kv.length != 2 ||
!kv[0].startsWith(PARTITION + ':') ||
!kv[1].startsWith(OFFSET + ':')) {
throw new ValidationException(validationExceptionMessage);
}

String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
try {
final Integer partition = Integer.valueOf(partitionValue);
final Long offset = Long.valueOf(offsetValue);
offsetMap.put(partition, offset);
} catch (NumberFormatException e) {
throw new ValidationException(validationExceptionMessage, e);
}
}
return offsetMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class ArcticValidator extends ConnectorDescriptorValidator {
public static final String SCAN_STARTUP_MODE_EARLIEST = "earliest";
public static final String SCAN_STARTUP_MODE_LATEST = "latest";
public static final String SCAN_STARTUP_MODE_TIMESTAMP = "timestamp";
public static final String SCAN_STARTUP_MODE_GROUP_OFFSETS = "group-offsets";
public static final String SCAN_STARTUP_MODE_SPECIFIC_OFFSETS = "specific-offsets";

public static final ConfigOption<Boolean> ARCTIC_LOG_CONSISTENCY_GUARANTEE_ENABLE =
ConfigOptions.key("log-store.consistency-guarantee.enabled")
Expand Down Expand Up @@ -135,12 +137,19 @@ public class ArcticValidator extends ConnectorDescriptorValidator {
" \"timestamp\": start from user-supplied timestamp for each partition.",
ARCTIC_READ_MODE, ARCTIC_READ_FILE, ARCTIC_READ_MODE, ARCTIC_READ_LOG));

public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
ConfigOptions.key("scan.startup.timestamp-millis")
.longType()
.noDefaultValue()
.withDescription(
"Optional timestamp used in case of \"timestamp\" startup mode");
public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions
.key("scan.startup.timestamp-millis")
.longType()
.noDefaultValue()
.withDescription(
"Optional timestamp used in case of \"timestamp\" startup mode");

public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions
.key("scan.startup.specific-offsets")
.stringType()
.noDefaultValue()
.withDescription(
"Optional timestamp used in case of \"timestamp\" startup mode");

public static final ConfigOption<Boolean> SUBMIT_EMPTY_SNAPSHOTS = ConfigOptions
.key("submit.empty.snapshots")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,21 @@
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.regex.Pattern;

import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_EARLIEST;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_GROUP_OFFSETS;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_LATEST;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_SPECIFIC_OFFSETS;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_MODE_TIMESTAMP;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_SPECIFIC_OFFSETS;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_STARTUP_TIMESTAMP_MILLIS;
import static com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil.fetchLogstorePrefixProperties;
import static com.netease.arctic.flink.util.CompatibleFlinkPropertyUtil.getLogTopic;
Expand All @@ -77,6 +80,8 @@ public class LogKafkaSourceBuilder {
private static final String[] REQUIRED_CONFIGS = {
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ConsumerConfig.GROUP_ID_CONFIG
};
private static final String PARTITION = "partition";
private static final String OFFSET = "offset";
// The subscriber specifies the partitions to subscribe to.
private KafkaSubscriber subscriber;
// Users can specify the starting / stopping offset initializer.
Expand Down Expand Up @@ -209,6 +214,7 @@ public LogKafkaSourceBuilder setPartitions(Set<TopicPartition> partitions) {
public LogKafkaSourceBuilder setStartingOffsets(
OffsetsInitializer startingOffsetsInitializer) {
this.startingOffsetsInitializer = startingOffsetsInitializer;
LOG.info("Setting LogKafkaSource starting offset: {}", startingOffsetsInitializer);
return this;
}

Expand Down Expand Up @@ -413,13 +419,6 @@ private void setupKafkaProperties() {
private void setupStartupMode() {
String startupMode = CompatiblePropertyUtil.propertyAsString(tableProperties, SCAN_STARTUP_MODE.key(),
SCAN_STARTUP_MODE.defaultValue()).toLowerCase();
long startupTimestampMillis = 0L;
if (Objects.equals(startupMode.toLowerCase(), SCAN_STARTUP_MODE_TIMESTAMP)) {
startupTimestampMillis = Long.parseLong(Preconditions.checkNotNull(
tableProperties.get(SCAN_STARTUP_TIMESTAMP_MILLIS.key()),
String.format("'%s' should be set in '%s' mode",
SCAN_STARTUP_TIMESTAMP_MILLIS.key(), SCAN_STARTUP_MODE_TIMESTAMP)));
}

switch (startupMode) {
case SCAN_STARTUP_MODE_EARLIEST:
Expand All @@ -429,12 +428,39 @@ private void setupStartupMode() {
setStartingOffsets(OffsetsInitializer.latest());
break;
case SCAN_STARTUP_MODE_TIMESTAMP:
long startupTimestampMillis = Long.parseLong(Preconditions.checkNotNull(
tableProperties.get(SCAN_STARTUP_TIMESTAMP_MILLIS.key()),
String.format("'%s' should be set in '%s' mode",
SCAN_STARTUP_TIMESTAMP_MILLIS.key(), SCAN_STARTUP_MODE_TIMESTAMP)));
setStartingOffsets(OffsetsInitializer.timestamp(startupTimestampMillis));
break;
case SCAN_STARTUP_MODE_GROUP_OFFSETS:
setStartingOffsets(OffsetsInitializer.committedOffsets());
break;
case SCAN_STARTUP_MODE_SPECIFIC_OFFSETS:
Map<TopicPartition, Long> specificOffsets = new HashMap<>();
String specificOffsetsStrOpt = Preconditions.checkNotNull(
tableProperties.get(SCAN_STARTUP_SPECIFIC_OFFSETS.key()),
String.format("'%s' should be set in '%s' mode",
SCAN_STARTUP_SPECIFIC_OFFSETS.key(), SCAN_STARTUP_MODE_SPECIFIC_OFFSETS));
final Map<Integer, Long> offsetMap =
parseSpecificOffsets(specificOffsetsStrOpt, SCAN_STARTUP_SPECIFIC_OFFSETS.key());
offsetMap.forEach(
(partition, offset) -> {
final TopicPartition topicPartition =
new TopicPartition(getLogTopic(tableProperties).get(0), partition);
specificOffsets.put(topicPartition, offset);
}
);
setStartingOffsets(OffsetsInitializer.offsets(specificOffsets));
break;
default:
throw new ValidationException(String.format(
"%s only support '%s', '%s', '%s'. But input is '%s'", ArcticValidator.SCAN_STARTUP_MODE,
SCAN_STARTUP_MODE_LATEST, SCAN_STARTUP_MODE_EARLIEST, SCAN_STARTUP_MODE_TIMESTAMP, startupMode));
"%s only support '%s', '%s', '%s', '%s', '%s'. But input is '%s'",
ArcticValidator.SCAN_STARTUP_MODE,
SCAN_STARTUP_MODE_LATEST, SCAN_STARTUP_MODE_EARLIEST,
SCAN_STARTUP_MODE_TIMESTAMP, SCAN_STARTUP_MODE_GROUP_OFFSETS,
SCAN_STARTUP_MODE_SPECIFIC_OFFSETS, startupMode));
}
}

Expand Down Expand Up @@ -507,4 +533,43 @@ private void sanityCheck() {
subscriber,
String.format("No topic is specified, '%s' should be set.", LOG_STORE_MESSAGE_TOPIC));
}

public static Map<Integer, Long> parseSpecificOffsets(
String specificOffsetsStr, String optionKey) {
final Map<Integer, Long> offsetMap = new HashMap<>();
final String[] pairs = specificOffsetsStr.split(";");
final String validationExceptionMessage =
String.format(
"Invalid properties '%s' should follow the format " +
"'partition:0,offset:42;partition:1,offset:300', but is '%s'.",
optionKey, specificOffsetsStr);

if (pairs.length == 0) {
throw new ValidationException(validationExceptionMessage);
}

for (String pair : pairs) {
if (null == pair || pair.length() == 0 || !pair.contains(",")) {
throw new ValidationException(validationExceptionMessage);
}

final String[] kv = pair.split(",");
if (kv.length != 2 ||
!kv[0].startsWith(PARTITION + ':') ||
!kv[1].startsWith(OFFSET + ':')) {
throw new ValidationException(validationExceptionMessage);
}

String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
try {
final Integer partition = Integer.valueOf(partitionValue);
final Long offset = Long.valueOf(offsetValue);
offsetMap.put(partition, offset);
} catch (NumberFormatException e) {
throw new ValidationException(validationExceptionMessage, e);
}
}
return offsetMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class ArcticValidator extends ConnectorDescriptorValidator {
public static final String SCAN_STARTUP_MODE_EARLIEST = "earliest";
public static final String SCAN_STARTUP_MODE_LATEST = "latest";
public static final String SCAN_STARTUP_MODE_TIMESTAMP = "timestamp";
public static final String SCAN_STARTUP_MODE_GROUP_OFFSETS = "group-offsets";
public static final String SCAN_STARTUP_MODE_SPECIFIC_OFFSETS = "specific-offsets";

public static final ConfigOption<Boolean> ARCTIC_LOG_CONSISTENCY_GUARANTEE_ENABLE =
ConfigOptions.key("log-store.consistency-guarantee.enabled")
Expand Down Expand Up @@ -121,19 +123,19 @@ public class ArcticValidator extends ConnectorDescriptorValidator {
.defaultValue(2048)
.withDescription("The target number of records for Iceberg reader fetch batch.");

public static final ConfigOption<String> SCAN_STARTUP_MODE = ConfigOptions
.key("scan.startup.mode")
.stringType()
.defaultValue(SCAN_STARTUP_MODE_LATEST)
.withDescription(String.format("Optional startup mode for arctic source, valid values are " +
public static final ConfigOption<String> SCAN_STARTUP_MODE =
ConfigOptions.key("scan.startup.mode")
.stringType()
.defaultValue(SCAN_STARTUP_MODE_LATEST)
.withDescription(String.format("Optional startup mode for arctic source, valid values are " +
"\"earliest\" or \"latest\", \"timestamp\". If %s values %s, \"earliest\":" +
" read earliest table data including base and change files from" +
" the current snapshot, \"latest\": read all incremental data in the change table starting from the" +
" current snapshot (the current snapshot will be excluded), \"timestamp\" has not supported yet." +
" If %s values %s, \"earliest\": start from the earliest offset possible." +
" \"latest\": start from the latest offset," +
" \"timestamp\": start from user-supplied timestamp for each partition.",
ARCTIC_READ_MODE, ARCTIC_READ_FILE, ARCTIC_READ_MODE, ARCTIC_READ_LOG));
ARCTIC_READ_MODE, ARCTIC_READ_FILE, ARCTIC_READ_MODE, ARCTIC_READ_LOG));

public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
ConfigOptions.key("scan.startup.timestamp-millis")
Expand All @@ -142,6 +144,13 @@ public class ArcticValidator extends ConnectorDescriptorValidator {
.withDescription(
"Optional timestamp used in case of \"timestamp\" startup mode");

public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS =
ConfigOptions.key("scan.startup.specific-offsets")
.stringType()
.noDefaultValue()
.withDescription(
"Optional timestamp used in case of \"timestamp\" startup mode");

public static final ConfigOption<Boolean> SUBMIT_EMPTY_SNAPSHOTS = ConfigOptions
.key("submit.empty.snapshots")
.booleanType()
Expand Down
Loading

0 comments on commit 2d3b0bd

Please sign in to comment.