Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Aug 8, 2023
1 parent f4a727d commit 3272a8f
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public boolean getInsecure() {
}

public static final Duration DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT = Duration.ofSeconds(30);
public static final Duration DEFAULT_METRICS_UPDATE_INTERVAL = Duration.ofSeconds(60);

@JsonProperty("bootstrap_servers")
private List<String> bootStrapServers;
Expand Down Expand Up @@ -67,6 +68,9 @@ public boolean getInsecure() {
@JsonProperty("acknowledgments_timeout")
private Duration acknowledgementsTimeout = DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT;

@JsonProperty("metrics_update_interval")
private Duration metricsUpdateInterval = DEFAULT_METRICS_UPDATE_INTERVAL;

@JsonProperty("client_dns_lookup")
private String clientDnsLookup;

Expand All @@ -82,6 +86,10 @@ public Duration getAcknowledgementsTimeout() {
return acknowledgementsTimeout;
}

public Duration getMetricsUpdateInterval() {
return metricsUpdateInterval;
}

public List<TopicConfig> getTopics() {
return topics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis
static final String NUMBER_OF_POLL_AUTH_ERRORS = "numberOfPollAuthErrors";
static final String NUMBER_OF_NON_CONSUMERS = "numberOfNonConsumers";
static final String DEFAULT_KEY = "message";
static final int METRICS_UPDATE_INTERVAL = 60;

private volatile long lastCommitTime;
private KafkaConsumer consumer= null;
Expand All @@ -94,6 +93,7 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis
private final Duration acknowledgementsTimeout;
private final KafkaTopicMetrics topicMetrics;
private long metricsUpdatedTime;
private long metricsUpdateInterval;

public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
final AtomicBoolean shutdownInProgress,
Expand All @@ -119,6 +119,7 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
this.metricsUpdatedTime = Instant.now().getEpochSecond();
this.topicMetrics.register(consumer);
this.acknowledgementsTimeout = sourceConfig.getAcknowledgementsTimeout();
this.metricsUpdateInterval = sourceConfig.getMetricsUpdateInterval().getSeconds();
// If the timeout value is different from default value, then enable acknowledgements automatically.
this.acknowledgementsEnabled = sourceConfig.getAcknowledgementsEnabled() || acknowledgementsTimeout != KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT;
this.acknowledgementSetManager = acknowledgementSetManager;
Expand Down Expand Up @@ -237,15 +238,15 @@ Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit() {

public void updateMetrics() {
long curTime = Instant.now().getEpochSecond();
if (curTime - metricsUpdatedTime >= METRICS_UPDATE_INTERVAL) {
if (curTime - metricsUpdatedTime >= metricsUpdateInterval) {
topicMetrics.update(consumer);
topicMetrics.update(consumer, NUMBER_OF_DESERIALIZATION_ERRORS, numberOfDeserializationErrors);
topicMetrics.update(consumer, NUMBER_OF_BUFFER_SIZE_OVERFLOWS, numberOfBufferSizeOverflows);
topicMetrics.update(consumer, NUMBER_OF_RECORDS_FAILED_TO_PARSE, numberOfRecordsFailedToParse);
topicMetrics.update(consumer, NUMBER_OF_POLL_AUTH_ERRORS, numberOfPollAuthErrors);
topicMetrics.update(consumer, NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS, numberOfPositiveAcknowledgements);
topicMetrics.update(consumer, NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS , numberOfNegativeAcknowledgements);
topicMetrics.update(consumer, NUMBER_OF_NON_CONSUMERS , (consumer.assignment().size() == 0) ? 1 : 0);
topicMetrics.setMetric(consumer, NUMBER_OF_DESERIALIZATION_ERRORS, numberOfDeserializationErrors);
topicMetrics.setMetric(consumer, NUMBER_OF_BUFFER_SIZE_OVERFLOWS, numberOfBufferSizeOverflows);
topicMetrics.setMetric(consumer, NUMBER_OF_RECORDS_FAILED_TO_PARSE, numberOfRecordsFailedToParse);
topicMetrics.setMetric(consumer, NUMBER_OF_POLL_AUTH_ERRORS, numberOfPollAuthErrors);
topicMetrics.setMetric(consumer, NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS, numberOfPositiveAcknowledgements);
topicMetrics.setMetric(consumer, NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS , numberOfNegativeAcknowledgements);
topicMetrics.setMetric(consumer, NUMBER_OF_NON_CONSUMERS , (consumer.assignment().size() == 0) ? 1 : 0);

metricsUpdatedTime = curTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void start(Buffer<Record<Event>> buffer) {
KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG);
sourceConfig.getTopics().forEach(topic -> {
consumerGroupID = topic.getGroupId();
KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics);
KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics, sourceConfig.getMetricsUpdateInterval().getSeconds());
Properties consumerProperties = getConsumerProperties(topic, authProperties);
MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@
import java.util.HashMap;

public class KafkaTopicMetrics {
private int metricUpdateInterval;
private long metricUpdateInterval;
private final String topicName;
private long updateTime;
private Map<KafkaConsumer, Map<String, Object>> consumerMetricsMap;
private Map<String, String> camelCaseMap;
private final PluginMetrics pluginMetrics;

public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetrics) {
public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetrics, final long metricUpdateInterval) {
this.pluginMetrics = pluginMetrics;
this.topicName = topicName;
this.consumerMetricsMap = new HashMap<>();
this.updateTime = Instant.now().getEpochSecond();
this.metricUpdateInterval = 60; //seconds
this.metricUpdateInterval = metricUpdateInterval;
this.camelCaseMap = new HashMap<>();
camelCaseMap.put("bytes-consumed-total", "bytesConsumedTotal");
camelCaseMap.put("records-consumed-total", "recordsConsumedTotal");
Expand Down Expand Up @@ -56,7 +56,7 @@ private String getCamelCaseName(final String name) {
return camelCaseName;
}

public void update(final KafkaConsumer consumer, final String metricName, Integer metricValue) {
public void setMetric(final KafkaConsumer consumer, final String metricName, Integer metricValue) {
synchronized(consumerMetricsMap) {
Map<String, Object> cmetrics = consumerMetricsMap.get(consumer);
if (cmetrics != null) {
Expand All @@ -65,12 +65,30 @@ public void update(final KafkaConsumer consumer, final String metricName, Intege
}
}

private void aggregateMetrics() {
synchronized (consumerMetricsMap) {
final Map<String, Double> aggregatedMetrics = new HashMap<>();
consumerMetricsMap.forEach((c, metricsMap) -> {
Double value = 0.0;
metricsMap.forEach((metricName, metricValue) -> {
if (metricValue instanceof Double) {
aggregatedMetrics.put(metricName, ((Double)metricValue) + aggregatedMetrics.getOrDefault(metricName, 0.0));
}
});
});
aggregatedMetrics.forEach((name, value) -> {
pluginMetrics.gauge("topic."+topicName+"."+getCamelCaseName(name), value);
});
}
}

public void update(final KafkaConsumer consumer) {
Map<String, Object> cmetrics = null;
synchronized(consumerMetricsMap) {
cmetrics = consumerMetricsMap.get(consumer);
}
if (cmetrics == null) {
// This should not happen...
if (Objects.isNull(cmetrics)) {
return;
}
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
Expand All @@ -79,31 +97,19 @@ public void update(final KafkaConsumer consumer) {
Metric value = entry.getValue();
String metricName = metric.name();
String metricGroup = metric.group();
if ((metricName.contains("consumed")) ||
((!metric.tags().containsKey("partition")) &&
(metricName.equals("records-lag-max") || metricName.equals("records-lead-min"))) ||
(metricName.equals("commit-rate") || metricName.equals("join-rate") || metricName.equals("commit-total")) ||
(metricName.equals("incoming-byte-rate") || metricName.equals("outgoing-byte-rate"))) {
cmetrics.put(metricName, value.metricValue());
if (Objects.nonNull(camelCaseMap.get(metricName))) {
if (metric.tags().containsKey("partition") &&
(metricName.equals("records-lag-max") || metricName.equals("records-lead-min"))) {
continue;
}
cmetrics.put(metricName, value.metricValue());
}
}
synchronized (consumerMetricsMap) {
long curTime = Instant.now().getEpochSecond();
if (curTime - updateTime > metricUpdateInterval) {
final Map<String, Double> aggregatedMetrics = new HashMap<>();
consumerMetricsMap.forEach((c, metricsMap) -> {
Double value = 0.0;
metricsMap.forEach((metricName, metricValue) -> {
if (metricValue instanceof Double) {
aggregatedMetrics.put(metricName, ((Double)metricValue) + aggregatedMetrics.getOrDefault(metricName, 0.0));
}
});
});
aggregatedMetrics.forEach((name, value) -> {
pluginMetrics.gauge("topic."+topicName+"."+getCamelCaseName(name), value);
});
updateTime = curTime;
}

long curTime = Instant.now().getEpochSecond();
if (curTime - updateTime > metricUpdateInterval) {
aggregateMetrics();
updateTime = curTime;
}
}
}

0 comments on commit 3272a8f

Please sign in to comment.