Skip to content

Commit

Permalink
Kafka Fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Oct 6, 2023
1 parent 36b0b9c commit afda9f0
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceJsonDeserializer;
import org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
Expand Down Expand Up @@ -167,21 +168,7 @@ private void setPropertiesForPlaintextAndJsonWithoutSchemaRegistry(Properties pr
MessageFormat dataFormat = topicConfig.getSerdeFormat();
schemaType = dataFormat.toString();
LOG.error("Setting schemaType to {}", schemaType);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
switch (dataFormat) {
case JSON:
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSourceJsonDeserializer.class);
break;
case BYTES:
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
break;
default:
case PLAINTEXT:
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
break;
}
SinkPropertyConfigurer.setPropertiesForDeserializer(properties, schemaType);
}

private void setPropertiesForSchemaRegistryConnectivity(final KafkaConsumerConfig kafkaConsumerConfig, final Properties properties) {
Expand Down Expand Up @@ -209,26 +196,18 @@ private void setPropertiesForSchemaRegistryConnectivity(final KafkaConsumerConfi
private void setPropertiesForSchemaType(final KafkaConsumerConfig kafkaConsumerConfig, final Properties properties, final TopicConfig topic) {
Map prop = properties;
Map<String, String> propertyMap = (Map<String, String>) prop;
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl(kafkaConsumerConfig));
properties.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS, false);
final CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(properties.getProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG),
100, propertyMap);
try {
schemaType = schemaRegistryClient.getSchemaMetadata(topic.getName() + "-value",
kafkaConsumerConfig.getSchemaConfig().getVersion()).getSchemaType();
SinkPropertyConfigurer.setPropertiesForDeserializer(properties, schemaType);
} catch (IOException | RestClientException e) {
LOG.error("Failed to connect to the schema registry...");
throw new RuntimeException(e);
}
if (schemaType.equalsIgnoreCase(MessageFormat.JSON.toString())) {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class);
} else if (schemaType.equalsIgnoreCase(MessageFormat.AVRO.toString())) {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
} else {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
}
}

private String getSchemaRegistryUrl(final KafkaConsumerConfig kafkaConsumerConfig) {
Expand All @@ -244,4 +223,4 @@ private void setupConfluentSchemaRegistry(final SchemaConfig schemaConfig, final
throw new RuntimeException("RegistryURL must be specified for confluent schema registry");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private Future send(final String topicName, String key, final Object record) {
}

private void publishJsonMessage(final Record<Event> record, final String key) throws IOException, ProcessingException {
final JsonNode dataNode = new ObjectMapper().convertValue(record.getData().toJsonString(), JsonNode.class);
final JsonNode dataNode = record.getData().getJsonNode();
if (validateJson(topicName, record.getData().toJsonString())) {
send(topicName, key, dataNode);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer;
import org.slf4j.Logger;
Expand All @@ -29,7 +30,9 @@ public void createTopic(final String topicName, final Integer numberOfPartitions
LOG.info(topicName + " created successfully");

} catch (Exception e) {
LOG.error("Caught exception creating topic with name: {}", topicName, e);
if (!(e.getCause() instanceof TopicExistsException)) {
LOG.error("Caught exception creating topic with name: {}", topicName, e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceJsonDeserializer;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer;
import org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.slf4j.Logger;
Expand Down Expand Up @@ -353,43 +354,24 @@ private void setSchemaRegistryProperties(Properties properties, TopicConfig topi
private void setPropertiesForPlaintextAndJsonWithoutSchemaRegistry(Properties properties, final TopicConfig topicConfig) {
MessageFormat dataFormat = topicConfig.getSerdeFormat();
schemaType = dataFormat.toString();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
switch (dataFormat) {
case JSON:
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSourceJsonDeserializer.class);
break;
default:
case PLAINTEXT:
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
break;
}
SinkPropertyConfigurer.setPropertiesForDeserializer(properties, schemaType);
}

private void setPropertiesForSchemaType(Properties properties, TopicConfig topic) {
Map prop = properties;
Map<String, String> propertyMap = (Map<String, String>) prop;
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl());
properties.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS, false);
schemaRegistryClient = new CachedSchemaRegistryClient(properties.getProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG),
100, propertyMap);
try {
schemaType = schemaRegistryClient.getSchemaMetadata(topic.getName() + "-value",
sourceConfig.getSchemaConfig().getVersion()).getSchemaType();
SinkPropertyConfigurer.setPropertiesForDeserializer(properties, schemaType);
} catch (IOException | RestClientException e) {
LOG.error("Failed to connect to the schema registry...");
throw new RuntimeException(e);
}
if (schemaType.equalsIgnoreCase(MessageFormat.JSON.toString())) {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class);
} else if (schemaType.equalsIgnoreCase(MessageFormat.AVRO.toString())) {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
} else {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
}
}

private void setConsumerTopicProperties(Properties properties, TopicConfig topicConfig) {
Expand Down Expand Up @@ -512,4 +494,4 @@ private String getMaskedBootStrapDetails(String serverIP) {
protected void sleep(final long millis) throws InterruptedException {
Thread.sleep(millis);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,16 @@
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import io.confluent.kafka.serializers.KafkaJsonDeserializer;
import io.confluent.kafka.serializers.KafkaJsonSerializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
Expand All @@ -30,10 +38,6 @@ public class SinkPropertyConfigurer {

private static final Logger LOG = LoggerFactory.getLogger(SinkPropertyConfigurer.class);

private static final String VALUE_SERIALIZER = "value.serializer";

private static final String KEY_SERIALIZER = "key.serializer";

private static final String SESSION_TIMEOUT_MS_CONFIG = "30000";

private static final String REGISTRY_URL = "schema.registry.url";
Expand Down Expand Up @@ -114,7 +118,7 @@ public static Properties getProducerProperties(final KafkaProducerConfig kafkaPr
setCommonServerProperties(properties, kafkaProducerConfig);

setPropertiesForSerializer(properties, kafkaProducerConfig.getSerdeFormat());

if (kafkaProducerConfig.getSchemaConfig() != null) {
setSchemaProps(kafkaProducerConfig.getSerdeFormat(), kafkaProducerConfig.getSchemaConfig(), properties);
}
Expand All @@ -128,6 +132,9 @@ public static Properties getProducerProperties(final KafkaProducerConfig kafkaPr
}

private static void setAuthProperties(final KafkaProducerConfig kafkaSinkConfig, final Properties properties) {
if (kafkaSinkConfig.getAuthConfig() == null || kafkaSinkConfig.getAuthConfig().getSaslAuthConfig() == null) {
return;
}
if (kafkaSinkConfig.getAuthConfig().getSaslAuthConfig().getPlainTextAuthConfig() != null) {
final String sslEndpointAlgorithm = kafkaSinkConfig.getAuthConfig().getSaslAuthConfig().getSslEndpointAlgorithm();
if (null != sslEndpointAlgorithm && !sslEndpointAlgorithm.isEmpty() && sslEndpointAlgorithm.equalsIgnoreCase("https")) {
Expand All @@ -152,15 +159,30 @@ private static void setCommonServerProperties(final Properties properties, final
properties.put(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG, SESSION_TIMEOUT_MS_CONFIG);
}

public static void setPropertiesForDeserializer(final Properties properties, final String serdeFormat) {
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Class deserializer = StringDeserializer.class;
if (serdeFormat.equalsIgnoreCase(MessageFormat.BYTES.toString())) {
deserializer = ByteArrayDeserializer.class;
} else if (serdeFormat.equalsIgnoreCase(MessageFormat.JSON.toString())) {
deserializer = KafkaJsonDeserializer.class;
} else if (serdeFormat.equalsIgnoreCase(MessageFormat.AVRO.toString())) {
deserializer = KafkaAvroDeserializer.class;
}
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
}

private static void setPropertiesForSerializer(final Properties properties, final String serdeFormat) {
properties.put(KEY_SERIALIZER, StringSerializer.class.getName());
if (serdeFormat.equalsIgnoreCase(MessageFormat.JSON.toString())) {
properties.put(VALUE_SERIALIZER, JsonSerializer.class.getName());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
Class serializer = StringSerializer.class;
if (serdeFormat.equalsIgnoreCase(MessageFormat.BYTES.toString())) {
serializer = ByteArraySerializer.class;
} else if (serdeFormat.equalsIgnoreCase(MessageFormat.JSON.toString())) {
serializer = KafkaJsonSerializer.class;
} else if (serdeFormat.equalsIgnoreCase(MessageFormat.AVRO.toString())) {
properties.put(VALUE_SERIALIZER, KafkaAvroSerializer.class.getName());
} else {
properties.put(VALUE_SERIALIZER, StringSerializer.class.getName());
serializer = KafkaAvroSerializer.class;
}
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer);
}

private static void validateForRegistryURL(final String serdeFormat, final SchemaConfig schemaConfig) {
Expand Down

0 comments on commit afda9f0

Please sign in to comment.