diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java index 584ee24eb5..dff3d2b943 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java @@ -119,6 +119,7 @@ public void setup() { when(jsonTopic.getName()).thenReturn(testTopic); when(jsonTopic.getGroupId()).thenReturn(testGroup); when(jsonTopic.getWorkers()).thenReturn(1); + when(jsonTopic.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); when(jsonTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); when(jsonTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3)); when(jsonTopic.getAutoCommit()).thenReturn(false); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java index 6d52bba0ea..6179ba4f57 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java @@ -130,6 +130,7 @@ public void setup() { when(plainTextTopic.getName()).thenReturn(testTopic); when(plainTextTopic.getGroupId()).thenReturn(testGroup); when(plainTextTopic.getWorkers()).thenReturn(1); + when(plainTextTopic.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); when(plainTextTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); when(plainTextTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3)); when(plainTextTopic.getAutoCommit()).thenReturn(false); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java index 8aec89c864..6b240635d2 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicMetrics.java @@ -37,6 +37,7 @@ public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetri camelCaseMap.put("records-lag-max", "recordsLagMax"); camelCaseMap.put("records-lead-min", "recordsLeadMin"); camelCaseMap.put("commit-rate", "commitRate"); + camelCaseMap.put("commit-total", "commitTotal"); camelCaseMap.put("join-rate", "joinRate"); camelCaseMap.put("incoming-byte-rate", "incomingByteRate"); camelCaseMap.put("outgoing-byte-rate", "outgoingByteRate"); @@ -81,7 +82,7 @@ public void update(final KafkaConsumer consumer) { if ((metricName.contains("consumed")) || ((!metric.tags().containsKey("partition")) && (metricName.equals("records-lag-max") || metricName.equals("records-lead-min"))) || - (metricName.equals("commit-rate") || metricName.equals("join-rate")) || + (metricName.equals("commit-rate") || metricName.equals("join-rate") || metricName.equals("commit-total")) || (metricName.equals("incoming-byte-rate") || metricName.equals("outgoing-byte-rate"))) { cmetrics.put(metricName, value.metricValue()); }