Skip to content

Commit

Permalink
Add timestamp of committed offsets for consumer groups (for lag time)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Jan 5, 2024
1 parent 5b993cc commit eced05f
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 11 deletions.
53 changes: 47 additions & 6 deletions src/main/java/com/github/eyefloaters/kmetadb/AdminFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -11,6 +14,9 @@

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;
Expand Down Expand Up @@ -39,7 +45,9 @@ Map<String, Admin> getAdmins() {
return clusterNames.entrySet()
.stream()
.map(cluster -> {
var client = Admin.create(buildConfig(cluster.getKey()));
var adminConfig = buildConfig(AdminClientConfig.configNames(), cluster.getKey());
logConfig("Admin[" + cluster.getKey() + ']', adminConfig);
var client = Admin.create(adminConfig);
return Map.entry(cluster.getValue(), client);
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Expand All @@ -49,8 +57,29 @@ void closeAdmins(@Disposes Map<String, Admin> admins) {
admins.values().parallelStream().forEach(Admin::close);
}

Map<String, Object> buildConfig(String clusterKey) {
return AdminClientConfig.configNames()
@Produces
Map<String, Consumer<byte[], byte[]>> getConsumers() {
return clusterNames.entrySet()
.stream()
.map(cluster -> {
Set<String> configNames = ConsumerConfig.configNames().stream()
// Do not allow a group Id to be set for this application
.filter(Predicate.not(ConsumerConfig.GROUP_ID_CONFIG::equals))
.collect(Collectors.toSet());
var consumerConfig = buildConfig(configNames, cluster.getKey());
logConfig("Consumer[" + cluster.getKey() + ']', consumerConfig);
var client = new KafkaConsumer<byte[], byte[]>(consumerConfig);
return Map.entry(cluster.getValue(), client);
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

void closeConsumers(@Disposes Map<String, Consumer<byte[], byte[]>> consumers) {
consumers.values().parallelStream().forEach(Consumer::close);
}

Map<String, Object> buildConfig(Set<String> configNames, String clusterKey) {
return configNames
.stream()
.map(configName -> getClusterConfig(clusterKey, configName)
.or(() -> getDefaultConfig(clusterKey, configName))
Expand All @@ -63,14 +92,14 @@ Map<String, Object> buildConfig(String clusterKey) {
Optional<String> getClusterConfig(String clusterKey, String configName) {
return config.getOptionalValue("kmetadb.kafka." + clusterKey + '.' + configName, String.class)
.map(cfg -> {
log.debugf("OVERRIDE config %s for cluster %s", configName, clusterKey);
log.tracef("OVERRIDE config %s for cluster %s", configName, clusterKey);
return removeQuotes(cfg);
});
}

Optional<String> getDefaultConfig(String clusterKey, String configName) {
if (defaultClusterConfigs.containsKey(configName)) {
log.debugf("DEFAULT config %s for cluster %s", configName, clusterKey);
log.tracef("DEFAULT config %s for cluster %s", configName, clusterKey);
String cfg = defaultClusterConfigs.get(configName).toString();
return Optional.of(removeQuotes(cfg));
}
Expand All @@ -79,6 +108,18 @@ Optional<String> getDefaultConfig(String clusterKey, String configName) {
}

String removeQuotes(String cfg) {
return cfg.replaceAll("(^[\"'])|([\"']$)", "");
return BOUNDARY_QUOTES.matcher(cfg).replaceAll("");
}

void logConfig(String clientType, Map<String, Object> config) {
if (log.isDebugEnabled()) {
String msg = config.entrySet()
.stream()
.map(entry -> "\t%s = %s".formatted(entry.getKey(), entry.getValue()))
.collect(Collectors.joining("\n", "%s configuration:\n", ""));
log.debugf(msg, clientType);
}
}

private static final Pattern BOUNDARY_QUOTES = Pattern.compile("(^[\"'])|([\"']$)");
}
122 changes: 117 additions & 5 deletions src/main/java/com/github/eyefloaters/kmetadb/DataSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -54,6 +56,7 @@
import org.apache.kafka.clients.admin.ReplicaInfo;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
Expand All @@ -72,6 +75,9 @@ public class DataSync {
@Inject
Map<String, Admin> adminClients;

@Inject
Map<String, Consumer<byte[], byte[]>> consumers;

@Inject
DataSource dataSource;

Expand Down Expand Up @@ -114,7 +120,7 @@ void metadataLoop(String clusterName, Admin adminClient) {
}
}

LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(30));
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
}
}

Expand Down Expand Up @@ -425,7 +431,7 @@ Cluster refreshCluster(Timestamp now, String clusterName, DescribeClusterResult
reportSQLException(e1, "Failed to refresh `clusters` table");
}

return new Cluster(clusterId, kafkaId, nodes, controllerId);
return new Cluster(clusterName, clusterId, kafkaId, nodes, controllerId);
}

Cluster refreshNodes(Timestamp now, Cluster cluster) {
Expand Down Expand Up @@ -641,14 +647,119 @@ Map<String, Long> refreshConsumerGroups(Timestamp now, Cluster cluster, Admin ad
.collect(awaitingAll())
.join();

class TopicPartitionOffset {
final TopicPartition partition;
final long offset;
Timestamp offsetTimestamp;

TopicPartitionOffset(TopicPartition partition, long offset) {
this.partition = partition;
this.offset = offset;
}

TopicPartition partition() {
return partition;
}

long offset() {
return offset;
}

Timestamp offsetTimestamp() {
return offsetTimestamp;
}

void offsetTimestamp(Timestamp timestamp) {
offsetTimestamp = timestamp;
}
}

Consumer<byte[], byte[]> consumer = consumers.get(cluster.name());
Set<TopicPartition> allPartitions = groupOffsets.values()
.stream()
.map(Map::keySet)
.flatMap(Collection::stream)
.distinct()
.collect(Collectors.toSet());

var beginningOffsets = consumer.beginningOffsets(allPartitions);
var endOffsets = consumer.endOffsets(allPartitions);

Map<TopicPartition, List<TopicPartitionOffset>> targets = groupOffsets.values()
.stream()
.map(Map::entrySet)
.flatMap(Collection::stream)
.filter(e -> e.getValue().offset() < endOffsets.get(e.getKey()))
.filter(e -> e.getValue().offset() >= beginningOffsets.get(e.getKey()))
.map(e -> new TopicPartitionOffset(e.getKey(), e.getValue().offset()))
.collect(Collectors.groupingBy(TopicPartitionOffset::partition,
Collectors.toCollection(ArrayList::new)));

int rounds = targets.values().stream().mapToInt(Collection::size).max().orElse(0);

Set<TopicPartition> assignments = targets.keySet();

for (int r = 0; r < rounds; r++) {
consumer.assign(assignments);

int round = r;

assignments.forEach(topicPartition ->
consumer.seek(topicPartition, targets.get(topicPartition).get(round).offset()));

var result = consumer.poll(Duration.ofSeconds(10));
log.debugf("Consumer polled %s record(s) in round %d", result.count(), round);

assignments.forEach(topicPartition -> {
var records = result.records(topicPartition);

if (records.isEmpty()) {
// format args cast to avoid ambiguous method reference
log.infof("No records returned for offset %d in topic-partition %s-%d",
(Long) targets.get(topicPartition).get(round).offset(),
topicPartition.topic(),
(Integer) topicPartition.partition());
} else {
var rec = records.get(0);
Timestamp ts = Timestamp.from(Instant.ofEpochMilli(rec.timestamp()));
targets.get(topicPartition).get(round).offsetTimestamp(ts);

log.debugf("Timestamp of offset %d in topic-partition %s-%d is %s",
rec.offset(),
rec.topic(),
rec.partition(),
ts);
}
});

// Setup next round's assignments
assignments = targets.entrySet()
.stream()
.filter(e -> e.getValue().size() > round + 1)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
}

consumer.unsubscribe();

try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(sql("consumer-group-offsets-merge"))) {
Instant t0 = Instant.now();

for (var group : groupOffsets.entrySet()) {
for (var partition : group.getValue().entrySet()) {
int p = 0;
stmt.setLong(++p, partition.getValue().offset());
long committedOffset = partition.getValue().offset();
Timestamp offsetTimestamp = Optional.ofNullable(targets.get(partition.getKey()))
.orElseGet(Collections::emptyList)
.stream()
.filter(tpo -> tpo.offset == committedOffset)
.findFirst()
.map(TopicPartitionOffset::offsetTimestamp)
.orElse(null);

stmt.setLong(++p, committedOffset);
stmt.setTimestamp(++p, offsetTimestamp);
stmt.setString(++p, partition.getValue().metadata());
stmt.setObject(++p, partition.getValue().leaderEpoch().orElse(null), Types.BIGINT);
stmt.setTimestamp(++p, now);
Expand Down Expand Up @@ -746,15 +857,16 @@ static String sql(String resourceName) {
}

record Cluster(
String name,
int id,
String kafkaId,
Collection<Node> nodes,
int controllerId,
Map<Integer, Map<String, LogDirDescription>> logDirs,
AtomicReference<QuorumInfo> quorum
) {
public Cluster(int id, String kafkaId, Collection<Node> nodes, int controllerId) {
this(id, kafkaId, nodes, controllerId, new HashMap<>(), new AtomicReference<>());
public Cluster(String name, int id, String kafkaId, Collection<Node> nodes, int controllerId) {
this(name, id, kafkaId, nodes, controllerId, new HashMap<>(), new AtomicReference<>());
}

private <T> T mapQuorum(Function<QuorumInfo, T> mapFn) {
Expand Down
7 changes: 7 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ kafka.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClient
kafka.sasl.login.refresh.min.period.seconds=60
kafka.sasl.login.refresh.buffer.seconds=60

kafka.allow.auto.create.topics=false
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.auto.offset.reset=earliest
kafka.group.id=
kafka.max.partition.fetch.bytes=1024

# Noisy logger in Kafka client
%prod.quarkus.log.category."org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin".level=WARN

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ USING ( SELECT cg.cluster_id AS cluster_id
, cg.id AS consumer_group_id
, tp.id AS topic_partition_id
, ? AS "offset"
, CAST(? AS TIMESTAMP WITH TIME ZONE) AS offset_timestamp
, ? AS metadata
, ? AS leader_epoch
, CAST(? AS TIMESTAMP WITH TIME ZONE) AS refreshed_at
Expand All @@ -23,6 +24,7 @@ AND t.topic_partition_id = n.topic_partition_id

WHEN MATCHED
AND t."offset" IS NOT DISTINCT FROM n."offset"
AND t.offset_timestamp IS NOT DISTINCT FROM n.offset_timestamp
AND t.metadata IS NOT DISTINCT FROM n.metadata
AND t.leader_epoch IS NOT DISTINCT FROM n.leader_epoch
THEN
Expand All @@ -33,6 +35,7 @@ WHEN MATCHED
THEN
UPDATE
SET "offset" = n."offset"
, offset_timestamp = n.offset_timestamp
, metadata = n.metadata
, leader_epoch = n.leader_epoch
, modified_at = n.refreshed_at
Expand All @@ -44,6 +47,7 @@ WHEN NOT MATCHED
, consumer_group_id
, topic_partition_id
, "offset"
, offset_timestamp
, metadata
, leader_epoch
, discovered_at
Expand All @@ -54,6 +58,7 @@ WHEN NOT MATCHED
, n.consumer_group_id
, n.topic_partition_id
, n."offset"
, n.offset_timestamp
, n.metadata
, n.leader_epoch
, n.refreshed_at
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/db/migration/V0.0.1__initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ CREATE TABLE consumer_group_offsets
, consumer_group_id INT NOT NULL
, topic_partition_id INT NOT NULL
, "offset" BIGINT NOT NULL
, offset_timestamp TIMESTAMP WITH TIME ZONE
, metadata VARCHAR
, leader_epoch BIGINT
, discovered_at TIMESTAMP WITH TIME ZONE NOT NULL
Expand Down

0 comments on commit eced05f

Please sign in to comment.