From 3272a8fd6c3dfac49c9a1b255a9b8b0434c2806f Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 8 Aug 2023 15:02:48 +0000 Subject: [PATCH] Addressed review comments Signed-off-by: Krishna Kondaka --- .../configuration/KafkaSourceConfig.java | 8 +++ .../consumer/KafkaSourceCustomConsumer.java | 19 +++--- .../plugins/kafka/source/KafkaSource.java | 2 +- .../plugins/kafka/util/KafkaTopicMetrics.java | 62 ++++++++++--------- 4 files changed, 53 insertions(+), 38 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java index 922c8f0d08..d1d6dbc19f 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java @@ -37,6 +37,7 @@ public boolean getInsecure() { } public static final Duration DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT = Duration.ofSeconds(30); + public static final Duration DEFAULT_METRICS_UPDATE_INTERVAL = Duration.ofSeconds(60); @JsonProperty("bootstrap_servers") private List bootStrapServers; @@ -67,6 +68,9 @@ public boolean getInsecure() { @JsonProperty("acknowledgments_timeout") private Duration acknowledgementsTimeout = DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT; + @JsonProperty("metrics_update_interval") + private Duration metricsUpdateInterval = DEFAULT_METRICS_UPDATE_INTERVAL; + @JsonProperty("client_dns_lookup") private String clientDnsLookup; @@ -82,6 +86,10 @@ public Duration getAcknowledgementsTimeout() { return acknowledgementsTimeout; } + public Duration getMetricsUpdateInterval() { + return metricsUpdateInterval; + } + public List getTopics() { return topics; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index 5f818997d0..3ef562a50c 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -67,7 +67,6 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis static final String NUMBER_OF_POLL_AUTH_ERRORS = "numberOfPollAuthErrors"; static final String NUMBER_OF_NON_CONSUMERS = "numberOfNonConsumers"; static final String DEFAULT_KEY = "message"; - static final int METRICS_UPDATE_INTERVAL = 60; private volatile long lastCommitTime; private KafkaConsumer consumer= null; @@ -94,6 +93,7 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis private final Duration acknowledgementsTimeout; private final KafkaTopicMetrics topicMetrics; private long metricsUpdatedTime; + private long metricsUpdateInterval; public KafkaSourceCustomConsumer(final KafkaConsumer consumer, final AtomicBoolean shutdownInProgress, @@ -119,6 +119,7 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, this.metricsUpdatedTime = Instant.now().getEpochSecond(); this.topicMetrics.register(consumer); this.acknowledgementsTimeout = sourceConfig.getAcknowledgementsTimeout(); + this.metricsUpdateInterval = sourceConfig.getMetricsUpdateInterval().getSeconds(); // If the timeout value is different from default value, then enable acknowledgements automatically. this.acknowledgementsEnabled = sourceConfig.getAcknowledgementsEnabled() || acknowledgementsTimeout != KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT; this.acknowledgementSetManager = acknowledgementSetManager; @@ -237,15 +238,15 @@ Map getOffsetsToCommit() { public void updateMetrics() { long curTime = Instant.now().getEpochSecond(); - if (curTime - metricsUpdatedTime >= METRICS_UPDATE_INTERVAL) { + if (curTime - metricsUpdatedTime >= metricsUpdateInterval) { topicMetrics.update(consumer); - topicMetrics.update(consumer, NUMBER_OF_DESERIALIZATION_ERRORS, numberOfDeserializationErrors); - topicMetrics.update(consumer, NUMBER_OF_BUFFER_SIZE_OVERFLOWS, numberOfBufferSizeOverflows); - topicMetrics.update(consumer, NUMBER_OF_RECORDS_FAILED_TO_PARSE, numberOfRecordsFailedToParse); - topicMetrics.update(consumer, NUMBER_OF_POLL_AUTH_ERRORS, numberOfPollAuthErrors); - topicMetrics.update(consumer, NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS, numberOfPositiveAcknowledgements); - topicMetrics.update(consumer, NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS , numberOfNegativeAcknowledgements); - topicMetrics.update(consumer, NUMBER_OF_NON_CONSUMERS , (consumer.assignment().size() == 0) ? 1 : 0); + topicMetrics.setMetric(consumer, NUMBER_OF_DESERIALIZATION_ERRORS, numberOfDeserializationErrors); + topicMetrics.setMetric(consumer, NUMBER_OF_BUFFER_SIZE_OVERFLOWS, numberOfBufferSizeOverflows); + topicMetrics.setMetric(consumer, NUMBER_OF_RECORDS_FAILED_TO_PARSE, numberOfRecordsFailedToParse); + topicMetrics.setMetric(consumer, NUMBER_OF_POLL_AUTH_ERRORS, numberOfPollAuthErrors); + topicMetrics.setMetric(consumer, NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS, numberOfPositiveAcknowledgements); + topicMetrics.setMetric(consumer, NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS , numberOfNegativeAcknowledgements); + topicMetrics.setMetric(consumer, NUMBER_OF_NON_CONSUMERS , (consumer.assignment().size() == 0) ? 1 : 0); metricsUpdatedTime = curTime; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index e41906870e..1e23b2be02 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -119,7 +119,7 @@ public void start(Buffer> buffer) { KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); sourceConfig.getTopics().forEach(topic -> { consumerGroupID = topic.getGroupId(); - KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics); + KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics, sourceConfig.getMetricsUpdateInterval().getSeconds()); Properties consumerProperties = getConsumerProperties(topic, authProperties); MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType); try { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java index 6b240635d2..3c8cc7b6ed 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java @@ -16,19 +16,19 @@ import java.util.HashMap; public class KafkaTopicMetrics { - private int metricUpdateInterval; + private long metricUpdateInterval; private final String topicName; private long updateTime; private Map> consumerMetricsMap; private Map camelCaseMap; private final PluginMetrics pluginMetrics; - public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetrics) { + public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetrics, final long metricUpdateInterval) { this.pluginMetrics = pluginMetrics; this.topicName = topicName; this.consumerMetricsMap = new HashMap<>(); this.updateTime = Instant.now().getEpochSecond(); - this.metricUpdateInterval = 60; //seconds + this.metricUpdateInterval = metricUpdateInterval; this.camelCaseMap = new HashMap<>(); camelCaseMap.put("bytes-consumed-total", "bytesConsumedTotal"); camelCaseMap.put("records-consumed-total", "recordsConsumedTotal"); @@ -56,7 +56,7 @@ private String getCamelCaseName(final String name) { return camelCaseName; } - public void update(final KafkaConsumer consumer, final String metricName, Integer metricValue) { + public void setMetric(final KafkaConsumer consumer, final String metricName, Integer metricValue) { synchronized(consumerMetricsMap) { Map cmetrics = consumerMetricsMap.get(consumer); if (cmetrics != null) { @@ -65,12 +65,30 @@ public void update(final KafkaConsumer consumer, final String metricName, Intege } } + private void aggregateMetrics() { + synchronized (consumerMetricsMap) { + final Map aggregatedMetrics = new HashMap<>(); + consumerMetricsMap.forEach((c, metricsMap) -> { + Double value = 0.0; + metricsMap.forEach((metricName, metricValue) -> { + if (metricValue instanceof Double) { + aggregatedMetrics.put(metricName, ((Double)metricValue) + aggregatedMetrics.getOrDefault(metricName, 0.0)); + } + }); + }); + aggregatedMetrics.forEach((name, value) -> { + pluginMetrics.gauge("topic."+topicName+"."+getCamelCaseName(name), value); + }); + } + } + public void update(final KafkaConsumer consumer) { Map cmetrics = null; synchronized(consumerMetricsMap) { cmetrics = consumerMetricsMap.get(consumer); } - if (cmetrics == null) { + // This should not happen... + if (Objects.isNull(cmetrics)) { return; } Map metrics = consumer.metrics(); @@ -79,31 +97,19 @@ public void update(final KafkaConsumer consumer) { Metric value = entry.getValue(); String metricName = metric.name(); String metricGroup = metric.group(); - if ((metricName.contains("consumed")) || - ((!metric.tags().containsKey("partition")) && - (metricName.equals("records-lag-max") || metricName.equals("records-lead-min"))) || - (metricName.equals("commit-rate") || metricName.equals("join-rate") || metricName.equals("commit-total")) || - (metricName.equals("incoming-byte-rate") || metricName.equals("outgoing-byte-rate"))) { - cmetrics.put(metricName, value.metricValue()); + if (Objects.nonNull(camelCaseMap.get(metricName))) { + if (metric.tags().containsKey("partition") && + (metricName.equals("records-lag-max") || metricName.equals("records-lead-min"))) { + continue; + } + cmetrics.put(metricName, value.metricValue()); } } - synchronized (consumerMetricsMap) { - long curTime = Instant.now().getEpochSecond(); - if (curTime - updateTime > metricUpdateInterval) { - final Map aggregatedMetrics = new HashMap<>(); - consumerMetricsMap.forEach((c, metricsMap) -> { - Double value = 0.0; - metricsMap.forEach((metricName, metricValue) -> { - if (metricValue instanceof Double) { - aggregatedMetrics.put(metricName, ((Double)metricValue) + aggregatedMetrics.getOrDefault(metricName, 0.0)); - } - }); - }); - aggregatedMetrics.forEach((name, value) -> { - pluginMetrics.gauge("topic."+topicName+"."+getCamelCaseName(name), value); - }); - updateTime = curTime; - } + + long curTime = Instant.now().getEpochSecond(); + if (curTime - updateTime > metricUpdateInterval) { + aggregateMetrics(); + updateTime = curTime; } } }