Skip to content

Commit

Permalink
fix: don't record consumer rate if offsets haven't been committed rec… (
Browse files Browse the repository at this point in the history
  • Loading branch information
Steve Mason authored Aug 14, 2023
2 parents 9259c55 + 4e17c6a commit cc11260
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public TriggerResult process(KubernetesClient client, ScaledResource resource, K
var minimumTopicRateMeasurements = Optional.ofNullable(trigger.getMetadata().get("minimumTopicRateMeasurements")).map(Long::parseLong).orElse(3L);
var consumerRatePercentile = Optional.ofNullable(trigger.getMetadata().get("consumerRatePercentile")).map(Double::parseDouble).orElse(99D);
var minimumConsumerRateMeasurements = Optional.ofNullable(trigger.getMetadata().get("minimumConsumerRateMeasurements")).map(Long::parseLong).orElse(3L);
var consumerCommitTimeout = Optional.ofNullable(trigger.getMetadata().get("consumerCommitTimeout")).map(Duration::parse).orElseGet(() -> Duration.ofMinutes(1L));

logger.debug("Requesting kafka metrics for topic={} and consumerGroupId={}", topic, consumerGroupId);

Expand All @@ -53,6 +54,7 @@ public TriggerResult process(KubernetesClient client, ScaledResource resource, K
lagModel.setMinimumConsumerRateMeasurements(minimumConsumerRateMeasurements);
lagModel.setTopicRatePercentile(topicRatePercentile);
lagModel.setMinimumTopicRateMeasurements(minimumTopicRateMeasurements);
lagModel.setConsumerCommitTimeout(consumerCommitTimeout);

var kafkaMetadata = KafkaMetadataCache.get(bootstrapServers);
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.brandwatch.kafka_pod_autoscaler.triggers.kafka;

import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalDouble;
import java.util.function.LongSupplier;

Expand All @@ -10,6 +12,7 @@
import com.google.common.annotations.VisibleForTesting;

import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;

public class TopicConsumerStats {
Expand All @@ -33,6 +36,9 @@ public class TopicConsumerStats {
@Getter
@Setter
private long minimumTopicRateMeasurements = 3;
@Getter
@Setter
private Duration consumerCommitTimeout = Duration.ofMinutes(1L);

public TopicConsumerStats() {
this(System::currentTimeMillis);
Expand Down Expand Up @@ -61,16 +67,21 @@ public void update(int replicaCount, Map<TopicPartition, Long> consumerOffsets,
.sum();

var newConsumerOffsets = new RecordedOffsets(now, consumerOffsets);
var newTopicEndOffsets = new RecordedOffsets(now, topicEndOffsets);
if (this.consumerOffsets != null) {
calculateRate(this.consumerOffsets, newConsumerOffsets)
.ifPresent(value -> historicalConsumerRates.addValue(value / (double) replicaCount));
if (this.consumerOffsets == null
|| this.consumerOffsets.haveAllChangedSince(newConsumerOffsets)
|| this.consumerOffsets.haveTimedout(clock, this.consumerCommitTimeout)) {
if (this.consumerOffsets != null) {
calculateRate(this.consumerOffsets, newConsumerOffsets)
.ifPresent(value -> historicalConsumerRates.addValue(value / (double) replicaCount));
}
this.consumerOffsets = newConsumerOffsets;
}

var newTopicEndOffsets = new RecordedOffsets(now, topicEndOffsets);
if (this.topicEndOffsets != null) {
calculateRate(this.topicEndOffsets, new RecordedOffsets(now, topicEndOffsets))
calculateRate(this.topicEndOffsets, newTopicEndOffsets)
.ifPresent(historicalTopicRates::addValue);
}
this.consumerOffsets = newConsumerOffsets;
this.topicEndOffsets = newTopicEndOffsets;
}

Expand Down Expand Up @@ -120,5 +131,13 @@ private OptionalDouble calculateRate(RecordedOffsets earliestOffsets, RecordedOf
}

record RecordedOffsets(long timestamp, Map<TopicPartition, Long> offsets) {
public boolean haveAllChangedSince(@NonNull RecordedOffsets otherOffsets) {
return offsets.keySet().stream()
.noneMatch(tp -> Objects.equals(otherOffsets.offsets.get(tp), offsets().get(tp)));
}

public boolean haveTimedout(@NonNull LongSupplier clock, @NonNull Duration consumerCommitTimeout) {
return (timestamp + consumerCommitTimeout.toMillis()) < clock.getAsLong();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.apache.kafka.common.TopicPartition;
Expand All @@ -31,10 +33,11 @@ public void variousScenarios(List<UpdateCallParameters> updateCalls,

stats.setMinimumTopicRateMeasurements(0L);
stats.setMinimumConsumerRateMeasurements(0L);
stats.setConsumerCommitTimeout(Duration.ofSeconds(10L));

for (var call : updateCalls) {
stats.update(call.replicaCount, call.consumerOffsets, call.topicEndOffsets);
clock.addAndGet(1_000);
clock.addAndGet(call.tickBy);
}

assertThat(stats.getLag(), equalTo(expectedResults.lag()));
Expand All @@ -46,8 +49,9 @@ public static Stream<Arguments> variousScenarios() {
return Stream.of(
Arguments.of(noCalls(), expect(1, 0, OptionalDouble.empty(), OptionalDouble.empty())),
Arguments.of(oneCallSameOffsets(), expect(1, 0, OptionalDouble.empty(), OptionalDouble.empty())),
Arguments.of(unmovingTopic(), expect(1, 0, OptionalDouble.of(0D), OptionalDouble.of(0D))),
Arguments.of(stuckConsumer(), expect(1, 8, OptionalDouble.of(0D), OptionalDouble.of(4D))),
Arguments.of(manyOfTheSameOffset_withinCommitTimeout(), expect(1, 0, OptionalDouble.empty(), OptionalDouble.of(0D))),
Arguments.of(manyOfTheSameOffset_outsideCommitTimeout(), expect(1, 0, OptionalDouble.of(0D), OptionalDouble.of(0D))),
Arguments.of(stuckConsumer(), expect(1, 8, OptionalDouble.of(0D), OptionalDouble.of(0.4D))),
Arguments.of(slowConsumer(), expect(1, 24, OptionalDouble.of(2D), OptionalDouble.of(16D))),
Arguments.of(keepingUpConsumer(), expect(1, 0L, OptionalDouble.of(8D), OptionalDouble.of(16D))),
Arguments.of(keepingUpConsumer(), expect(2, 0L, OptionalDouble.of(16D), OptionalDouble.of(16D))),
Expand All @@ -65,19 +69,23 @@ private static List<UpdateCallParameters> oneCallSameOffsets() {
return List.of(call(2, evenOffsets(0L), evenOffsets(0L)));
}

private static List<UpdateCallParameters> unmovingTopic() {
return List.of(
call(2, evenOffsets(0L), evenOffsets(0L)),
call(2, evenOffsets(0L), evenOffsets(0L)),
call(2, evenOffsets(0L), evenOffsets(0L))
);
private static List<UpdateCallParameters> manyOfTheSameOffset_withinCommitTimeout() {
return IntStream.range(0, 10)
.mapToObj(i -> call(2, evenOffsets(0L), evenOffsets(0L)))
.toList();
}

private static List<UpdateCallParameters> manyOfTheSameOffset_outsideCommitTimeout() {
return IntStream.range(0, 1_000)
.mapToObj(i -> call(2, evenOffsets(0L), evenOffsets(0L), 10_000))
.toList();
}

private static List<UpdateCallParameters> stuckConsumer() {
return List.of(
call(2, evenOffsets(0L), evenOffsets(0L)),
call(2, evenOffsets(0L), evenOffsets(1L)),
call(2, evenOffsets(0L), evenOffsets(2L))
call(2, evenOffsets(0L), evenOffsets(0L), 10_000),
call(2, evenOffsets(0L), evenOffsets(1L), 10_000),
call(2, evenOffsets(0L), evenOffsets(2L), 10_000)
);
}

Expand Down Expand Up @@ -128,7 +136,14 @@ private static Map<TopicPartition, Long> evenOffsets(long offset) {
private static UpdateCallParameters call(int replicaCount,
Map<TopicPartition, Long> consumerOffsets,
Map<TopicPartition, Long> topicEndOffsets) {
return new UpdateCallParameters(replicaCount, consumerOffsets, topicEndOffsets);
return new UpdateCallParameters(replicaCount, consumerOffsets, topicEndOffsets, 1_000);
}

private static UpdateCallParameters call(int replicaCount,
Map<TopicPartition, Long> consumerOffsets,
Map<TopicPartition, Long> topicEndOffsets,
long tickBy) {
return new UpdateCallParameters(replicaCount, consumerOffsets, topicEndOffsets, tickBy);
}

private static ExpectedResults expect(int replicaCount, long lag, OptionalDouble consumerRate, OptionalDouble topicRate) {
Expand All @@ -138,7 +153,8 @@ private static ExpectedResults expect(int replicaCount, long lag, OptionalDouble
public record UpdateCallParameters(
int replicaCount,
Map<TopicPartition, Long> consumerOffsets,
Map<TopicPartition, Long> topicEndOffsets) {
Map<TopicPartition, Long> topicEndOffsets,
long tickBy) {
}

public record ExpectedResults(int replicaCount, long lag, OptionalDouble consumerRate, OptionalDouble topicRate) {
Expand Down

0 comments on commit cc11260

Please sign in to comment.