Skip to content

Commit

Permalink
Fixed number of committed records stat. Also fixed bug when acknowled…
Browse files Browse the repository at this point in the history
…gements enabled

Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Aug 8, 2023
1 parent 3272a8f commit b4cca7d
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.Counter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -66,6 +65,7 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis
static final String NUMBER_OF_BUFFER_SIZE_OVERFLOWS = "numberOfBufferSizeOverflows";
static final String NUMBER_OF_POLL_AUTH_ERRORS = "numberOfPollAuthErrors";
static final String NUMBER_OF_NON_CONSUMERS = "numberOfNonConsumers";
static final String NUMBER_OF_RECORDS_COMMITTED = "numberOfRecordsCommitted";
static final String DEFAULT_KEY = "message";

private volatile long lastCommitTime;
Expand All @@ -83,12 +83,14 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis
private Set<TopicPartition> partitionsToReset;
private final AcknowledgementSetManager acknowledgementSetManager;
private final Map<Integer, TopicPartitionCommitTracker> partitionCommitTrackerMap;
private List<Map<TopicPartition, Range<Long>>> acknowledgedOffsets;
private Integer numberOfPositiveAcknowledgements;
private Integer numberOfNegativeAcknowledgements;
private Integer numberOfRecordsFailedToParse;
private Integer numberOfDeserializationErrors;
private Integer numberOfBufferSizeOverflows;
private Integer numberOfPollAuthErrors;
private long numberOfRecordsCommitted;
private final boolean acknowledgementsEnabled;
private final Duration acknowledgementsTimeout;
private final KafkaTopicMetrics topicMetrics;
Expand All @@ -108,6 +110,7 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
this.shutdownInProgress = shutdownInProgress;
this.consumer = consumer;
this.buffer = buffer;
this.numberOfRecordsCommitted = 0;
this.numberOfRecordsFailedToParse = 0;
this.numberOfDeserializationErrors = 0;
this.numberOfBufferSizeOverflows = 0;
Expand All @@ -118,6 +121,7 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
this.offsetsToCommit = new HashMap<>();
this.metricsUpdatedTime = Instant.now().getEpochSecond();
this.topicMetrics.register(consumer);
this.acknowledgedOffsets = new ArrayList<>();
this.acknowledgementsTimeout = sourceConfig.getAcknowledgementsTimeout();
this.metricsUpdateInterval = sourceConfig.getMetricsUpdateInterval().getSeconds();
// If the timeout value is different from default value, then enable acknowledgements automatically.
Expand All @@ -132,7 +136,10 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
this.lastCommitTime = System.currentTimeMillis();
}

public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata) {
public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata, Range<Long> offsetRange) {
long min = offsetRange.getMinimum();
long max = offsetRange.getMaximum();
numberOfRecordsCommitted += (max - min + 1);
if (Objects.isNull(offsetAndMetadata)) {
return;
}
Expand All @@ -146,20 +153,9 @@ private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, Range<Lo
acknowledgementSetManager.create((result) -> {
if (result == true) {
numberOfPositiveAcknowledgements++;
offsets.forEach((partition, offsetRange) -> {
try {
int partitionId = partition.partition();
if (!partitionCommitTrackerMap.containsKey(partitionId)) {
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition);
Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null;
partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset));
}
OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange);
updateOffsetsToCommit(partition, offsetAndMetadata);
} catch (Exception e) {
LOG.error("Failed to seek to last committed offset upon positive acknowledgement {}", partition, e);
}
});
synchronized(acknowledgedOffsets) {
acknowledgedOffsets.add(offsets);
}
} else {
numberOfNegativeAcknowledgements++;
offsets.forEach((partition, offsetRange) -> {
Expand All @@ -182,7 +178,7 @@ public <T> void consumeRecords() throws Exception {
iterateRecordPartitions(records, acknowledgementSet, offsets);
if (!acknowledgementsEnabled) {
offsets.forEach((partition, offsetRange) ->
updateOffsetsToCommit(partition, new OffsetAndMetadata(offsetRange.getMaximum() + 1)));
updateOffsetsToCommit(partition, new OffsetAndMetadata(offsetRange.getMaximum() + 1), offsetRange));
} else {
acknowledgementSet.complete();
}
Expand All @@ -199,7 +195,7 @@ public <T> void consumeRecords() throws Exception {
}
}

private void resetOrCommitOffsets() {
private void resetOffsets() {
if (partitionsToReset.size() > 0) {
partitionsToReset.forEach(partition -> {
try {
Expand All @@ -211,9 +207,35 @@ private void resetOrCommitOffsets() {
});
partitionsToReset.clear();
}
}

void processAcknowledgedOffsets() {
synchronized(acknowledgedOffsets) {
acknowledgedOffsets.forEach(offsets -> {
offsets.forEach((partition, offsetRange) -> {
try {
int partitionId = partition.partition();
if (!partitionCommitTrackerMap.containsKey(partitionId)) {
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition);
Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null;
partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset));
}
OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange);
updateOffsetsToCommit(partition, offsetAndMetadata, offsetRange);
} catch (Exception e) {
LOG.error("Failed committed offsets upon positive acknowledgement {}", partition, e);
}
});
});
acknowledgedOffsets.clear();
}
}

private void commitOffsets() {
if (topicConfig.getAutoCommit()) {
return;
}
processAcknowledgedOffsets();
long currentTimeMillis = System.currentTimeMillis();
if ((currentTimeMillis - lastCommitTime) < topicConfig.getCommitInterval().toMillis()) {
return;
Expand Down Expand Up @@ -247,6 +269,7 @@ public void updateMetrics() {
topicMetrics.setMetric(consumer, NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS, numberOfPositiveAcknowledgements);
topicMetrics.setMetric(consumer, NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS , numberOfNegativeAcknowledgements);
topicMetrics.setMetric(consumer, NUMBER_OF_NON_CONSUMERS , (consumer.assignment().size() == 0) ? 1 : 0);
topicMetrics.setMetric(consumer, NUMBER_OF_RECORDS_COMMITTED, numberOfRecordsCommitted);

metricsUpdatedTime = curTime;
}
Expand All @@ -257,7 +280,8 @@ public void run() {
consumer.subscribe(Arrays.asList(topicName));
while (!shutdownInProgress.get()) {
try {
resetOrCommitOffsets();
resetOffsets();
commitOffsets();
consumeRecords();
updateMetrics();
} catch (Exception exp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetri
camelCaseMap.put("records-lag-max", "recordsLagMax");
camelCaseMap.put("records-lead-min", "recordsLeadMin");
camelCaseMap.put("commit-rate", "commitRate");
camelCaseMap.put("commit-total", "commitTotal");
camelCaseMap.put("join-rate", "joinRate");
camelCaseMap.put("incoming-byte-rate", "incomingByteRate");
camelCaseMap.put("outgoing-byte-rate", "outgoingByteRate");
Expand All @@ -56,11 +55,11 @@ private String getCamelCaseName(final String name) {
return camelCaseName;
}

public void setMetric(final KafkaConsumer consumer, final String metricName, Integer metricValue) {
public void setMetric(final KafkaConsumer consumer, final String metricName, Number metricValue) {
synchronized(consumerMetricsMap) {
Map<String, Object> cmetrics = consumerMetricsMap.get(consumer);
if (cmetrics != null) {
cmetrics.put(metricName, (double)metricValue);
cmetrics.put(metricName, metricValue.doubleValue());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.any;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -202,6 +201,7 @@ public void testPlainTextConsumeRecordsWithAcknowledgements() throws Interrupted
Thread.sleep(10000);
} catch (Exception e){}

consumer.processAcknowledgedOffsets();
offsetsToCommit = consumer.getOffsetsToCommit();
Assertions.assertEquals(offsetsToCommit.size(), 1);
offsetsToCommit.forEach((topicPartition, offsetAndMetadata) -> {
Expand Down

0 comments on commit b4cca7d

Please sign in to comment.