From afda9f059edc34378b573c3254c099e5cf0eab57 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Fri, 6 Oct 2023 22:55:21 +0000 Subject: [PATCH] Kafka Fixes Signed-off-by: Krishna Kondaka --- .../consumer/KafkaCustomConsumerFactory.java | 29 ++---------- .../kafka/producer/KafkaCustomProducer.java | 2 +- .../plugins/kafka/service/TopicService.java | 5 +- .../plugins/kafka/source/KafkaSource.java | 26 ++--------- .../kafka/util/SinkPropertyConfigurer.java | 46 ++++++++++++++----- 5 files changed, 47 insertions(+), 61 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java index 0e544e7f02..cf662e1b2e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java @@ -29,6 +29,7 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceJsonDeserializer; +import org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer; import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; @@ -167,21 +168,7 @@ private void setPropertiesForPlaintextAndJsonWithoutSchemaRegistry(Properties pr MessageFormat dataFormat = topicConfig.getSerdeFormat(); schemaType = dataFormat.toString(); LOG.error("Setting schemaType to {}", schemaType); - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - switch (dataFormat) { - case JSON: - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSourceJsonDeserializer.class); - break; - case BYTES: - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - break; - default: - case PLAINTEXT: - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - break; - } + SinkPropertyConfigurer.setPropertiesForDeserializer(properties, schemaType); } private void setPropertiesForSchemaRegistryConnectivity(final KafkaConsumerConfig kafkaConsumerConfig, final Properties properties) { @@ -209,7 +196,6 @@ private void setPropertiesForSchemaRegistryConnectivity(final KafkaConsumerConfi private void setPropertiesForSchemaType(final KafkaConsumerConfig kafkaConsumerConfig, final Properties properties, final TopicConfig topic) { Map prop = properties; Map propertyMap = (Map) prop; - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl(kafkaConsumerConfig)); properties.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS, false); final CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(properties.getProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG), @@ -217,18 +203,11 @@ private void setPropertiesForSchemaType(final KafkaConsumerConfig kafkaConsumerC try { schemaType = schemaRegistryClient.getSchemaMetadata(topic.getName() + "-value", kafkaConsumerConfig.getSchemaConfig().getVersion()).getSchemaType(); + SinkPropertyConfigurer.setPropertiesForDeserializer(properties, schemaType); } catch (IOException | RestClientException e) { LOG.error("Failed to connect to the schema registry..."); throw new RuntimeException(e); } - if (schemaType.equalsIgnoreCase(MessageFormat.JSON.toString())) { - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class); - } else if (schemaType.equalsIgnoreCase(MessageFormat.AVRO.toString())) { - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); - } else { - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - } } private String getSchemaRegistryUrl(final KafkaConsumerConfig kafkaConsumerConfig) { @@ -244,4 +223,4 @@ private void setupConfluentSchemaRegistry(final SchemaConfig schemaConfig, final throw new RuntimeException("RegistryURL must be specified for confluent schema registry"); } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java index edd94906c1..636797698f 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java @@ -143,7 +143,7 @@ private Future send(final String topicName, String key, final Object record) { } private void publishJsonMessage(final Record record, final String key) throws IOException, ProcessingException { - final JsonNode dataNode = new ObjectMapper().convertValue(record.getData().toJsonString(), JsonNode.class); + final JsonNode dataNode = record.getData().getJsonNode(); if (validateJson(topicName, record.getData().toJsonString())) { send(topicName, key, dataNode); } else { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/service/TopicService.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/service/TopicService.java index 12d5142364..9e12d21e4f 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/service/TopicService.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/service/TopicService.java @@ -6,6 +6,7 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.errors.TopicExistsException; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; import org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer; import org.slf4j.Logger; @@ -29,7 +30,9 @@ public void createTopic(final String topicName, final Integer numberOfPartitions LOG.info(topicName + " created successfully"); } catch (Exception e) { - LOG.error("Caught exception creating topic with name: {}", topicName, e); + if (!(e.getCause() instanceof TopicExistsException)) { + LOG.error("Caught exception creating topic with name: {}", topicName, e); + } } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index a74c1c4737..ac0cee9fb4 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -45,6 +45,7 @@ import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceJsonDeserializer; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer; +import org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer; import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import org.slf4j.Logger; @@ -353,24 +354,12 @@ private void setSchemaRegistryProperties(Properties properties, TopicConfig topi private void setPropertiesForPlaintextAndJsonWithoutSchemaRegistry(Properties properties, final TopicConfig topicConfig) { MessageFormat dataFormat = topicConfig.getSerdeFormat(); schemaType = dataFormat.toString(); - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - switch (dataFormat) { - case JSON: - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSourceJsonDeserializer.class); - break; - default: - case PLAINTEXT: - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - break; - } + SinkPropertyConfigurer.setPropertiesForDeserializer(properties, schemaType); } private void setPropertiesForSchemaType(Properties properties, TopicConfig topic) { Map prop = properties; Map propertyMap = (Map) prop; - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()); properties.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS, false); schemaRegistryClient = new CachedSchemaRegistryClient(properties.getProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG), @@ -378,18 +367,11 @@ private void setPropertiesForSchemaType(Properties properties, TopicConfig topic try { schemaType = schemaRegistryClient.getSchemaMetadata(topic.getName() + "-value", sourceConfig.getSchemaConfig().getVersion()).getSchemaType(); + SinkPropertyConfigurer.setPropertiesForDeserializer(properties, schemaType); } catch (IOException | RestClientException e) { LOG.error("Failed to connect to the schema registry..."); throw new RuntimeException(e); } - if (schemaType.equalsIgnoreCase(MessageFormat.JSON.toString())) { - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class); - } else if (schemaType.equalsIgnoreCase(MessageFormat.AVRO.toString())) { - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); - } else { - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - } } private void setConsumerTopicProperties(Properties properties, TopicConfig topicConfig) { @@ -512,4 +494,4 @@ private String getMaskedBootStrapDetails(String serverIP) { protected void sleep(final long millis) throws InterruptedException { Thread.sleep(millis); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java index 5d54f688d2..c3359e63d2 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java @@ -10,8 +10,16 @@ import org.apache.commons.lang3.ObjectUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.connect.json.JsonSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import io.confluent.kafka.serializers.KafkaJsonDeserializer; +import io.confluent.kafka.serializers.KafkaJsonSerializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroSerializer; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties; @@ -30,10 +38,6 @@ public class SinkPropertyConfigurer { private static final Logger LOG = LoggerFactory.getLogger(SinkPropertyConfigurer.class); - private static final String VALUE_SERIALIZER = "value.serializer"; - - private static final String KEY_SERIALIZER = "key.serializer"; - private static final String SESSION_TIMEOUT_MS_CONFIG = "30000"; private static final String REGISTRY_URL = "schema.registry.url"; @@ -114,7 +118,7 @@ public static Properties getProducerProperties(final KafkaProducerConfig kafkaPr setCommonServerProperties(properties, kafkaProducerConfig); setPropertiesForSerializer(properties, kafkaProducerConfig.getSerdeFormat()); - + if (kafkaProducerConfig.getSchemaConfig() != null) { setSchemaProps(kafkaProducerConfig.getSerdeFormat(), kafkaProducerConfig.getSchemaConfig(), properties); } @@ -128,6 +132,9 @@ public static Properties getProducerProperties(final KafkaProducerConfig kafkaPr } private static void setAuthProperties(final KafkaProducerConfig kafkaSinkConfig, final Properties properties) { + if (kafkaSinkConfig.getAuthConfig() == null || kafkaSinkConfig.getAuthConfig().getSaslAuthConfig() == null) { + return; + } if (kafkaSinkConfig.getAuthConfig().getSaslAuthConfig().getPlainTextAuthConfig() != null) { final String sslEndpointAlgorithm = kafkaSinkConfig.getAuthConfig().getSaslAuthConfig().getSslEndpointAlgorithm(); if (null != sslEndpointAlgorithm && !sslEndpointAlgorithm.isEmpty() && sslEndpointAlgorithm.equalsIgnoreCase("https")) { @@ -152,15 +159,30 @@ private static void setCommonServerProperties(final Properties properties, final properties.put(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG, SESSION_TIMEOUT_MS_CONFIG); } + public static void setPropertiesForDeserializer(final Properties properties, final String serdeFormat) { + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + Class deserializer = StringDeserializer.class; + if (serdeFormat.equalsIgnoreCase(MessageFormat.BYTES.toString())) { + deserializer = ByteArrayDeserializer.class; + } else if (serdeFormat.equalsIgnoreCase(MessageFormat.JSON.toString())) { + deserializer = KafkaJsonDeserializer.class; + } else if (serdeFormat.equalsIgnoreCase(MessageFormat.AVRO.toString())) { + deserializer = KafkaAvroDeserializer.class; + } + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer); + } + private static void setPropertiesForSerializer(final Properties properties, final String serdeFormat) { - properties.put(KEY_SERIALIZER, StringSerializer.class.getName()); - if (serdeFormat.equalsIgnoreCase(MessageFormat.JSON.toString())) { - properties.put(VALUE_SERIALIZER, JsonSerializer.class.getName()); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + Class serializer = StringSerializer.class; + if (serdeFormat.equalsIgnoreCase(MessageFormat.BYTES.toString())) { + serializer = ByteArraySerializer.class; + } else if (serdeFormat.equalsIgnoreCase(MessageFormat.JSON.toString())) { + serializer = KafkaJsonSerializer.class; } else if (serdeFormat.equalsIgnoreCase(MessageFormat.AVRO.toString())) { - properties.put(VALUE_SERIALIZER, KafkaAvroSerializer.class.getName()); - } else { - properties.put(VALUE_SERIALIZER, StringSerializer.class.getName()); + serializer = KafkaAvroSerializer.class; } + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer); } private static void validateForRegistryURL(final String serdeFormat, final SchemaConfig schemaConfig) {