diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java index a5726e0374..fa53cd6ad1 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java @@ -100,19 +100,8 @@ public void setAttribute(final String key, final Object value) { public Object getAttribute(final String attributeKey) { String key = (attributeKey.charAt(0) == '/') ? attributeKey.substring(1) : attributeKey; - Map mapObject = attributes; - if (key.contains("/")) { - String[] keys = key.split("/"); - for (int i = 0; i < keys.length-1; i++) { - Object value = mapObject.get(keys[i]); - if (value == null || !(value instanceof Map)) { - return null; - } - mapObject = (Map)value; - key = keys[i+1]; - } - } - return mapObject.get(key); + // Does not support recursive or inner-object lookups for now. + return attributes.get(key); } @Override diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java index 057ce4d1a6..c87bf1a101 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java @@ -134,25 +134,6 @@ public void testSetAttribute(String key, final Object value) { assertThat(eventMetadata.getAttribute(key), equalTo(value)); } - private static Stream getNestedAttributeTestInputs() { - return Stream.of(Arguments.of(Map.of("k1", "v1", "k2", Map.of("k3", "v3")), "k1", "v1"), - Arguments.of(Map.of("k1", "v1", "k2", Map.of("k3", "v3")), "k2/k3", "v3"), - Arguments.of(Map.of("k1", "v1", "k2", Map.of("k3", Map.of("k4", 4))), "k2/k3/k4", 4), - Arguments.of(Map.of("k1", "v1", "k2", Map.of("k3", 4)), "k2/k3/k4", null), - Arguments.of(Map.of("k1","v1"),"k1", "v1")); - } - - @ParameterizedTest - @MethodSource("getNestedAttributeTestInputs") - public void testNestedGetAttribute(Map attributes, final String key, final Object expectedValue) { - eventMetadata = DefaultEventMetadata.builder() - .withEventType(testEventType) - .withTimeReceived(testTimeReceived) - .withAttributes(attributes) - .build(); - assertThat(eventMetadata.getAttribute(key), equalTo(expectedValue)); - } - @Test public void test_with_ExternalOriginationTime() { Instant now = Instant.now(); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java index 53eeefdf25..19e66e134e 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java @@ -13,9 +13,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.internals.RecordHeader; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -39,10 +36,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -54,7 +49,6 @@ import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -62,7 +56,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; - public class KafkaSourceJsonTypeIT { private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceJsonTypeIT.class); private static final int TEST_ID = 123456; @@ -105,10 +98,6 @@ public class KafkaSourceJsonTypeIT { private String testKey; private String testTopic; private String testGroup; - private String headerKey1; - private byte[] headerValue1; - private String headerKey2; - private byte[] headerValue2; public KafkaSource createObjectUnderTest() { return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, kafkaClusterConfigSupplier, pluginConfigObservable); @@ -116,10 +105,6 @@ public KafkaSource createObjectUnderTest() { @BeforeEach public void setup() throws Throwable { - headerKey1 = RandomStringUtils.randomAlphabetic(6); - headerValue1 = RandomStringUtils.randomAlphabetic(10).getBytes(StandardCharsets.UTF_8); - headerKey2 = RandomStringUtils.randomAlphabetic(5); - headerValue2 = RandomStringUtils.randomAlphabetic(15).getBytes(StandardCharsets.UTF_8); sourceConfig = mock(KafkaSourceConfig.class); pluginMetrics = mock(PluginMetrics.class); counter = mock(Counter.class); @@ -224,13 +209,6 @@ public void TestJsonRecordsWithNullKey() throws Exception { assertThat(map.get("kafka_key"), equalTo(null)); assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic)); assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0")); - Map kafkaHeaders = (Map) metadata.getAttributes().get("kafka_headers"); - assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1)); - assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2)); - assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null))); - assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime")); - assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1)); - assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2)); } } @@ -262,13 +240,6 @@ public void TestJsonRecordsWithNegativeAcknowledgements() throws Exception { assertThat(map.get("status"), equalTo(true)); assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic)); assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0")); - Map kafkaHeaders = (Map) metadata.getAttributes().get("kafka_headers"); - assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1)); - assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2)); - assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null))); - assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime")); - assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1)); - assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2)); event.getEventHandle().release(false); } receivedRecords.clear(); @@ -287,13 +258,6 @@ public void TestJsonRecordsWithNegativeAcknowledgements() throws Exception { assertThat(map.get("status"), equalTo(true)); assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic)); assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0")); - Map kafkaHeaders = (Map) metadata.getAttributes().get("kafka_headers"); - assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1)); - assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2)); - assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null))); - assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime")); - assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1)); - assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2)); event.getEventHandle().release(true); } } @@ -325,13 +289,6 @@ public void TestJsonRecordsWithKafkaKeyModeDiscard() throws Exception { assertThat(map.get("status"), equalTo(true)); assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic)); assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0")); - Map kafkaHeaders = (Map) metadata.getAttributes().get("kafka_headers"); - assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1)); - assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2)); - assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null))); - assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime")); - assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1)); - assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2)); } } @@ -363,13 +320,6 @@ public void TestJsonRecordsWithKafkaKeyModeAsField() throws Exception { assertThat(map.get("kafka_key"), equalTo(testKey)); assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic)); assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0")); - Map kafkaHeaders = (Map) metadata.getAttributes().get("kafka_headers"); - assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1)); - assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2)); - assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null))); - assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime")); - assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1)); - assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2)); } } @@ -401,13 +351,6 @@ public void TestJsonRecordsWithKafkaKeyModeAsMetadata() throws Exception { assertThat(metadata.getAttributes().get("kafka_key"), equalTo(testKey)); assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic)); assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0")); - Map kafkaHeaders = (Map) metadata.getAttributes().get("kafka_headers"); - assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1)); - assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2)); - assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null))); - assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime")); - assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1)); - assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2)); } } @@ -421,12 +364,8 @@ public void produceJsonRecords(final String servers, final String topicName, fin KafkaProducer producer = new KafkaProducer(props); for (int i = 0; i < numRecords; i++) { String value = "{\"name\":\"testName" + i + "\", \"id\":" + (TEST_ID + i) + ", \"status\":true}"; - List
headers = Arrays.asList( - new RecordHeader(headerKey1, headerValue1), - new RecordHeader(headerKey2, headerValue2) - ); ProducerRecord record = - new ProducerRecord<>(topicName, null, testKey, value, new RecordHeaders(headers)); + new ProducerRecord<>(topicName, testKey, value); producer.send(record); try { Thread.sleep(100); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java index 56377c1f22..ee0f6557de 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java @@ -65,11 +65,6 @@ class BufferTopicConfig extends CommonTopicConfig implements TopicProducerConfig @Size(min = 1, max = 255, message = "size of group id should be between 1 and 255") private String groupId; - @JsonProperty("client_id") - @Valid - @Size(min = 1, max = 255, message = "size of client id should be between 1 and 255") - private String clientId; - @JsonProperty("workers") @Valid @Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200") @@ -140,11 +135,6 @@ public String getGroupId() { return groupId; } - @Override - public String getClientId() { - return clientId; - } - @Override public Duration getCommitInterval() { return commitInterval; diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConsumerConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConsumerConfig.java index 0f8de7b458..0ae2126cbe 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConsumerConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConsumerConfig.java @@ -16,8 +16,6 @@ public interface TopicConsumerConfig extends TopicConfig { String getGroupId(); - String getClientId(); - Boolean getAutoCommit(); String getAutoOffsetReset(); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java index 9e6a979d8e..f6024ef4e7 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java @@ -17,8 +17,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.RecordDeserializationException; @@ -428,16 +426,6 @@ private Record getRecord(ConsumerRecord consumerRecord, in if (kafkaKeyMode == KafkaKeyMode.INCLUDE_AS_METADATA) { eventMetadata.setAttribute("kafka_key", key); } - Headers headers = consumerRecord.headers(); - if (headers != null) { - Map headerData = new HashMap<>(); - for (Header header: headers) { - headerData.put(header.key(), header.value()); - } - eventMetadata.setAttribute("kafka_headers", headerData); - } - eventMetadata.setAttribute("kafka_timestamp", consumerRecord.timestamp()); - eventMetadata.setAttribute("kafka_timestamp_type", consumerRecord.timestampType().toString()); eventMetadata.setAttribute("kafka_topic", topicName); eventMetadata.setAttribute("kafka_partition", String.valueOf(partition)); eventMetadata.setExternalOriginationTime(Instant.ofEpochMilli(consumerRecord.timestamp())); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java index d703538e42..e4f0529ef8 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java @@ -134,19 +134,14 @@ private Properties getConsumerProperties(final KafkaConsumerConfig sourceConfig, break; } } - setConsumerTopicProperties(properties, topicConfig, topicConfig.getGroupId()); + setConsumerTopicProperties(properties, topicConfig); setSchemaRegistryProperties(sourceConfig, properties, topicConfig); LOG.debug("Starting consumer with the properties : {}", properties); return properties; } - - public static void setConsumerTopicProperties(final Properties properties, final TopicConsumerConfig topicConfig, - final String groupId) { - properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - if (Objects.nonNull(topicConfig.getClientId())) { - properties.put(ConsumerConfig.CLIENT_ID_CONFIG, topicConfig.getClientId()); - } + private void setConsumerTopicProperties(final Properties properties, final TopicConsumerConfig topicConfig) { + properties.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId()); properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int)topicConfig.getMaxPartitionFetchBytes()); properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long)topicConfig.getRetryBackoff().toMillis()).intValue()); properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long)topicConfig.getReconnectBackoff().toMillis()).intValue()); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 6a01a91bf0..3877350d3f 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -37,7 +37,6 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer; -import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory; import org.opensearch.dataprepper.plugins.kafka.consumer.PauseConsumePredicate; import org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigSupplier; import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType; @@ -319,7 +318,25 @@ private void setPropertiesForSchemaType(Properties properties, TopicConfig topic } private void setConsumerTopicProperties(Properties properties, TopicConsumerConfig topicConfig) { - KafkaCustomConsumerFactory.setConsumerTopicProperties(properties, topicConfig, consumerGroupID); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupID); + properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int) topicConfig.getMaxPartitionFetchBytes()); + properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long) topicConfig.getRetryBackoff().toMillis()).intValue()); + properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long) topicConfig.getReconnectBackoff().toMillis()).intValue()); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + topicConfig.getAutoCommit()); + properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, + ((Long) topicConfig.getCommitInterval().toMillis()).intValue()); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + topicConfig.getAutoOffsetReset()); + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + topicConfig.getConsumerMaxPollRecords()); + properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, + ((Long) topicConfig.getMaxPollInterval().toMillis()).intValue()); + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long) topicConfig.getSessionTimeOut().toMillis()).intValue()); + properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long) topicConfig.getHeartBeatInterval().toMillis()).intValue()); + properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, (int) topicConfig.getFetchMaxBytes()); + properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait()); + properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int) topicConfig.getFetchMinBytes()); } private void setPropertiesForSchemaRegistryConnectivity(Properties properties) { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java index 703fcded19..adcf030f1f 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java @@ -49,11 +49,6 @@ class SourceTopicConfig extends CommonTopicConfig implements TopicConsumerConfig @Size(min = 1, max = 255, message = "size of group id should be between 1 and 255") private String groupId; - @JsonProperty("client_id") - @Valid - @Size(min = 1, max = 255, message = "size of client id should be between 1 and 255") - private String clientId; - @JsonProperty("workers") @Valid @Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200") @@ -126,11 +121,6 @@ public String getGroupId() { return groupId; } - @Override - public String getClientId() { - return clientId; - } - @Override public MessageFormat getSerdeFormat() { return serdeFormat; diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java index 1503a7424d..ab7b07c9b0 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java @@ -82,7 +82,6 @@ class KafkaSourceTest { private PluginConfigObservable pluginConfigObservable; private static final String TEST_GROUP_ID = "testGroupId"; - private static final String TEST_CLIENT_ID = "testClientId"; public KafkaSource createObjectUnderTest() { return new KafkaSource( @@ -108,8 +107,6 @@ void setUp() throws Exception { when(topic2.getConsumerMaxPollRecords()).thenReturn(1); when(topic1.getGroupId()).thenReturn(TEST_GROUP_ID); when(topic2.getGroupId()).thenReturn(TEST_GROUP_ID); - when(topic1.getClientId()).thenReturn(TEST_CLIENT_ID); - when(topic2.getClientId()).thenReturn(TEST_CLIENT_ID); when(topic1.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); when(topic2.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); when(topic1.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(5)); @@ -159,18 +156,6 @@ void test_kafkaSource_basicFunctionality() { assertTrue(Objects.nonNull(kafkaSource.getConsumer())); } - @Test - void test_kafkaSource_basicFunctionalityWithClientIdNull() { - when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); - when(topic2.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); - when(topic1.getClientId()).thenReturn(null); - when(topic1.getClientId()).thenReturn(null); - kafkaSource = createObjectUnderTest(); - assertTrue(Objects.nonNull(kafkaSource)); - kafkaSource.start(buffer); - assertTrue(Objects.nonNull(kafkaSource.getConsumer())); - } - @Test void test_kafkaSource_retry_consumer_create() throws InterruptedException { when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));