From 058278853d53a6dbd9cdd0dfbeb421530603e165 Mon Sep 17 00:00:00 2001 From: rajeshLovesToCode <131366272+rajeshLovesToCode@users.noreply.github.com> Date: Mon, 7 Aug 2023 16:07:18 +0000 Subject: [PATCH] -Support for kafka-sink (#2999) * -Support for kafka-sink Signed-off-by: rajeshLovesToCode * -Support for kafka-sink Signed-off-by: rajeshLovesToCode --- .../kafka-plugins/build.gradle | 7 + .../kafka/configuration/AuthConfig.java | 14 +- .../KafkaProducerProperties.java | 250 ++++++++++++++ .../kafka/configuration/KafkaSinkConfig.java | 140 ++++++++ .../kafka/producer/KafkaSinkProducer.java | 196 +++++++++++ .../kafka/producer/ProducerWorker.java | 33 ++ .../plugins/kafka/sink/DLQSink.java | 94 +++++ .../plugins/kafka/sink/KafkaSink.java | 166 +++++++++ .../AuthenticationPropertyConfigurer.java | 144 ++++++++ .../kafka/util/SinkPropertyConfigurer.java | 323 ++++++++++++++++++ .../configuration/KafkaSinkConfigTest.java | 106 ++++++ .../kafka/producer/KafkaSinkProducerTest.java | 150 ++++++++ .../kafka/producer/ProducerWorkerTest.java | 50 +++ .../plugins/kafka/sink/DLQSinkTest.java | 96 ++++++ .../plugins/kafka/sink/KafkasinkTest.java | 191 +++++++++++ .../AuthenticationPropertyConfigurerTest.java | 75 ++++ .../util/SinkPropertyConfigurerTest.java | 92 +++++ .../sample-pipelines-sink-oauth.yaml | 34 ++ .../resources/sample-pipelines-sink-ssl.yaml | 56 +++ .../test/resources/sample-pipelines-sink.yaml | 57 ++++ 20 files changed, 2271 insertions(+), 3 deletions(-) create mode 100644 data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaProducerProperties.java create mode 100644 data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java create mode 100644 data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java create mode 100644 data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorker.java create mode 100644 data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java create mode 100644 data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java create mode 100644 data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurer.java create mode 100644 data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java create mode 100644 data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java create mode 100644 data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java create mode 100644 data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorkerTest.java create mode 100644 data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSinkTest.java create mode 100644 data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkasinkTest.java create mode 100644 data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurerTest.java create mode 100644 data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurerTest.java create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink-oauth.yaml create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink-ssl.yaml create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink.yaml diff --git a/data-prepper-plugins/kafka-plugins/build.gradle b/data-prepper-plugins/kafka-plugins/build.gradle index 899061b611..298f11fb86 100644 --- a/data-prepper-plugins/kafka-plugins/build.gradle +++ b/data-prepper-plugins/kafka-plugins/build.gradle @@ -43,6 +43,13 @@ dependencies { testImplementation 'org.apache.kafka:kafka-clients:3.4.0:test' testImplementation 'org.apache.kafka:connect-json:3.4.0' testImplementation('com.kjetland:mbknor-jackson-jsonschema_2.13:1.0.39') + implementation project(':data-prepper-plugins:failures-common') + testImplementation group: 'org.powermock', name: 'powermock-api-mockito2', version: '2.0.9' + implementation 'org.apache.kafka:connect-json:3.4.0' + implementation 'com.github.fge:json-schema-validator:2.2.14' + implementation 'commons-collections:commons-collections:3.2.2' + + } test { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfig.java index 3b59513882..4447f81a97 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfig.java @@ -6,8 +6,8 @@ package org.opensearch.dataprepper.plugins.kafka.configuration; import com.fasterxml.jackson.annotation.JsonProperty; -import jakarta.validation.constraints.AssertTrue; import jakarta.validation.Valid; +import jakarta.validation.constraints.AssertTrue; import java.util.stream.Stream; @@ -27,6 +27,9 @@ public static class SaslAuthConfig { @JsonProperty("aws_msk_iam") private AwsIamAuthConfig awsIamAuthConfig; + @JsonProperty("ssl_endpoint_identification_algorithm") + private String sslEndpointAlgorithm; + public AwsIamAuthConfig getAwsIamAuthConfig() { return awsIamAuthConfig; } @@ -39,14 +42,19 @@ public OAuthConfig getOAuthConfig() { return oAuthConfig; } + public String getSslEndpointAlgorithm() { + return sslEndpointAlgorithm; + } + @AssertTrue(message = "Only one of AwsIam or oAuth or PlainText auth config must be specified") public boolean hasOnlyOneConfig() { - return Stream.of(awsIamAuthConfig, plainTextAuthConfig, oAuthConfig).filter(n -> n!=null).count() == 1; + return Stream.of(awsIamAuthConfig, plainTextAuthConfig, oAuthConfig).filter(n -> n != null).count() == 1; } } - public static class SslAuthConfig { + + public static class SslAuthConfig { // TODO Add Support for SSL authentication types like // one-way or two-way authentication diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaProducerProperties.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaProducerProperties.java new file mode 100644 index 0000000000..6d624fcd84 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaProducerProperties.java @@ -0,0 +1,250 @@ +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.time.Duration; +import java.util.List; + +public class KafkaProducerProperties { + private static final String DEFAULT_BYTE_CAPACITY = "50mb"; + + @JsonProperty("buffer_memory") + private String bufferMemory = DEFAULT_BYTE_CAPACITY; + + @JsonProperty("compression_type") + private String compressionType; + + @JsonProperty("retries") + private int retries; + + @JsonProperty("batch_size") + private int batchSize; + + @JsonProperty("client_dns_lookup") + private String clientDnsLookup; + + @JsonProperty("client_id") + private String clientId; + + @JsonProperty("connections_max_idle") + private Duration connectionsMaxIdleMs; + + @JsonProperty("delivery_timeout") + private Duration deliveryTimeoutMs; + + @JsonProperty("linger_ms") + private Long lingerMs; + + @JsonProperty("max_block") + private Duration maxBlockMs; + + @JsonProperty("max_request_size") + private int maxRequestSize; + + @JsonProperty("partitioner_class") + private Class partitionerClass; + + @JsonProperty("partitioner_ignore_keys") + private Boolean partitionerIgnoreKeys; + + @JsonProperty("receive_buffer") + private String receiveBufferBytes=DEFAULT_BYTE_CAPACITY; + + @JsonProperty("request_timeout") + private Duration requestTimeoutMs; + + @JsonProperty("send_buffer") + private String sendBufferBytes=DEFAULT_BYTE_CAPACITY; + + @JsonProperty("socket_connection_setup_timeout_max") + private Duration socketConnectionSetupMaxTimeout; + + @JsonProperty("socket_connection_setup_timeout") + private Duration socketConnectionSetupTimeout; + + @JsonProperty("acks") + private String acks; + + @JsonProperty("enable_idempotence") + private Boolean enableIdempotence; + + @JsonProperty("interceptor_classes") + private List interceptorClasses; + + @JsonProperty("max_in_flight_requests_per_connection") + private int maxInFlightRequestsPerConnection; + + @JsonProperty("metadata_max_age") + private Duration metadataMaxAgeMs; + + @JsonProperty("metadata_max_idle") + private Duration metadataMaxIdleMs; + + @JsonProperty("metric_reporters") + private List metricReporters; + + @JsonProperty("metrics_num_samples") + private int metricsNumSamples; + + @JsonProperty("metrics_recording_level") + private String metricsRecordingLevel; + + @JsonProperty("metrics_sample_window") + private Duration metricsSampleWindowMs; + + @JsonProperty("partitioner_adaptive_partitioning_enable") + private boolean partitionerAdaptivePartitioningEnable; + + @JsonProperty("partitioner_availability_timeout") + private Duration partitionerAvailabilityTimeoutMs; + + @JsonProperty("reconnect_backoff_max") + private Duration reconnectBackoffMaxMs; + + @JsonProperty("reconnect_backoff") + private Duration reconnectBackoffMs; + + @JsonProperty("retry_backoff") + private Duration retryBackoffMs; + + + public String getCompressionType() { + return compressionType; + } + + public int getRetries() { + if (retries == 0) { + retries = 5; + } + return retries; + } + + public int getBatchSize() { + return batchSize; + } + + public String getClientDnsLookup() { + return clientDnsLookup; + } + + public String getClientId() { + return clientId; + } + + + public Long getLingerMs() { + return lingerMs; + } + + + public int getMaxRequestSize() { + return maxRequestSize; + } + + public Class getPartitionerClass() { + return partitionerClass; + } + + public Boolean getPartitionerIgnoreKeys() { + return partitionerIgnoreKeys; + } + + + public String getAcks() { + return acks; + } + + public Boolean getEnableIdempotence() { + return enableIdempotence; + } + + public List getInterceptorClasses() { + return interceptorClasses; + } + + public int getMaxInFlightRequestsPerConnection() { + return maxInFlightRequestsPerConnection; + } + + + public List getMetricReporters() { + return metricReporters; + } + + public int getMetricsNumSamples() { + return metricsNumSamples; + } + + public String getMetricsRecordingLevel() { + return metricsRecordingLevel; + } + + + public boolean isPartitionerAdaptivePartitioningEnable() { + return partitionerAdaptivePartitioningEnable; + } + + public String getBufferMemory() { + return bufferMemory; + } + + public Long getConnectionsMaxIdleMs() { + return connectionsMaxIdleMs.toMillis(); + } + + public Long getDeliveryTimeoutMs() { + return deliveryTimeoutMs.toMillis(); + } + + public Long getMaxBlockMs() { + return maxBlockMs.toMillis(); + } + + public String getReceiveBufferBytes() { + return receiveBufferBytes; + } + + public Long getRequestTimeoutMs() { + return requestTimeoutMs.toMillis(); + } + + public String getSendBufferBytes() { + return sendBufferBytes; + } + + public Long getSocketConnectionSetupMaxTimeout() { + return socketConnectionSetupMaxTimeout.toMillis(); + } + + public Long getSocketConnectionSetupTimeout() { + return socketConnectionSetupTimeout.toMillis(); + } + + public Long getMetadataMaxAgeMs() { + return metadataMaxAgeMs.toMillis(); + } + + public Long getMetadataMaxIdleMs() { + return metadataMaxIdleMs.toMillis(); + } + + public Long getMetricsSampleWindowMs() { + return metricsSampleWindowMs.toMillis(); + } + + public Long getPartitionerAvailabilityTimeoutMs() { + return partitionerAvailabilityTimeoutMs.toMillis(); + } + + public Long getReconnectBackoffMaxMs() { + return reconnectBackoffMaxMs.toMillis(); + } + + public Long getReconnectBackoffMs() { + return reconnectBackoffMs.toMillis(); + } + + public Long getRetryBackoffMs() { + return retryBackoffMs.toMillis(); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java new file mode 100644 index 0000000000..0e35750657 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java @@ -0,0 +1,140 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; +import org.apache.commons.lang3.ObjectUtils; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * * A helper class that helps to read user configuration values from + * pipelines.yaml + */ + +public class KafkaSinkConfig { + + public static final String DLQ = "dlq"; + + @JsonProperty("bootstrap_servers") + @NotNull + @Size(min = 1, message = "Bootstrap servers can't be empty") + private List bootStrapServers; + + private PluginModel dlq; + + public Optional getDlq() { + return Optional.ofNullable(dlq); + } + + public void setDlqConfig(final PluginSetting pluginSetting) { + final LinkedHashMap> dlq = (LinkedHashMap) pluginSetting.getAttributeFromSettings(DLQ); + if (dlq != null) { + if (dlq.size() != 1) { + throw new RuntimeException("dlq option must declare exactly one dlq configuration"); + } + final Map.Entry> entry = dlq.entrySet().stream() + .findFirst() + .get(); + + this.dlq = new PluginModel(entry.getKey(), entry.getValue()); + + } + } + + + @JsonProperty("thread_wait_time") + private Long threadWaitTime; + + + @JsonProperty("topic") + TopicConfig topic; + + @JsonProperty("authentication") + private AuthConfig authConfig; + + @JsonProperty("schema") + @Valid + private SchemaConfig schemaConfig; + + @JsonProperty("serde_format") + private String serdeFormat; + + + @JsonProperty("partition_key") + @NotNull + @NotEmpty + private String partitionKey; + + @JsonProperty("producer_properties") + private KafkaProducerProperties kafkaProducerProperties; + + public SchemaConfig getSchemaConfig() { + return schemaConfig; + } + + + public AuthConfig getAuthConfig() { + return authConfig; + } + + + public List getBootStrapServers() { + return bootStrapServers; + } + + public String getSerdeFormat() { + if (ObjectUtils.isEmpty(serdeFormat)) { + serdeFormat = "plaintext"; + } + return serdeFormat; + } + + public Long getThreadWaitTime() { + return threadWaitTime; + } + + public void setBootStrapServers(List bootStrapServers) { + this.bootStrapServers = bootStrapServers; + } + + public void setThreadWaitTime(Long threadWaitTime) { + this.threadWaitTime = threadWaitTime; + } + + public void setAuthConfig(AuthConfig authConfig) { + this.authConfig = authConfig; + } + + public void setSchemaConfig(SchemaConfig schemaConfig) { + this.schemaConfig = schemaConfig; + } + + public TopicConfig getTopic() { + return topic; + } + + public void setTopic(TopicConfig topic) { + this.topic = topic; + } + + public KafkaProducerProperties getKafkaProducerProperties() { + return kafkaProducerProperties; + } + + public String getPartitionKey() { + return partitionKey; + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java new file mode 100644 index 0000000000..2bd87b9200 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java @@ -0,0 +1,196 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.producer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.fge.jsonschema.core.exceptions.ProcessingException; +import com.github.fge.jsonschema.core.report.ProcessingReport; +import com.github.fge.jsonschema.main.JsonSchema; +import com.github.fge.jsonschema.main.JsonSchemaFactory; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink; +import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; +import java.util.Map; + + +/** + * * A helper class which helps takes the buffer data + * and produce it to a given kafka topic + */ + +public class KafkaSinkProducer { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkProducer.class); + + private final Producer producer; + + private final KafkaSinkConfig kafkaSinkConfig; + + private final DLQSink dlqSink; + + private final CachedSchemaRegistryClient schemaRegistryClient; + + private final Collection bufferedEventHandles; + + private final ExpressionEvaluator expressionEvaluator; + + private final String tagTargetKey; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + public KafkaSinkProducer(final Producer producer, + final KafkaSinkConfig kafkaSinkConfig, + final DLQSink dlqSink, + final CachedSchemaRegistryClient schemaRegistryClient, + final ExpressionEvaluator expressionEvaluator, + final String tagTargetKey) { + this.producer = producer; + this.kafkaSinkConfig = kafkaSinkConfig; + this.dlqSink = dlqSink; + this.schemaRegistryClient = schemaRegistryClient; + bufferedEventHandles = new LinkedList<>(); + this.expressionEvaluator = expressionEvaluator; + this.tagTargetKey = tagTargetKey; + + } + + public void produceRecords(final Record record) { + if (record.getData().getEventHandle() != null) { + bufferedEventHandles.add(record.getData().getEventHandle()); + } + TopicConfig topic = kafkaSinkConfig.getTopic(); + Event event = getEvent(record); + final String key = event.formatString(kafkaSinkConfig.getPartitionKey(), expressionEvaluator); + Object dataForDlq = event.toJsonString(); + LOG.info("Producing record " + dataForDlq); + try { + final String serdeFormat = kafkaSinkConfig.getSerdeFormat(); + if (MessageFormat.JSON.toString().equalsIgnoreCase(serdeFormat)) { + publishJsonMessage(record, topic, key, dataForDlq); + } else if (MessageFormat.AVRO.toString().equalsIgnoreCase(serdeFormat)) { + publishAvroMessage(record, topic, key, dataForDlq); + } else { + publishPlaintextMessage(record, topic, key, dataForDlq); + } + } catch (Exception e) { + releaseEventHandles(false); + } + + } + + private Event getEvent(Record record) { + Event event = record.getData(); + try { + event = addTagsToEvent(event, tagTargetKey); + } catch (JsonProcessingException e) { + LOG.error("error occured while processing tag target key"); + } + return event; + } + + private void publishPlaintextMessage(Record record, TopicConfig topic, String key, Object dataForDlq) { + producer.send(new ProducerRecord(topic.getName(), key, record.getData().toJsonString()), callBack(dataForDlq)); + } + + private void publishAvroMessage(Record record, TopicConfig topic, String key, Object dataForDlq) throws RestClientException, IOException { + final String valueToParse = schemaRegistryClient. + getLatestSchemaMetadata(topic.getName() + "-value").getSchema(); + final Schema schema = new Schema.Parser().parse(valueToParse); + final GenericRecord genericRecord = getGenericRecord(record.getData(), schema); + producer.send(new ProducerRecord(topic.getName(), key, genericRecord), callBack(dataForDlq)); + } + + private void publishJsonMessage(Record record, TopicConfig topic, String key, Object dataForDlq) throws IOException, RestClientException, ProcessingException { + final JsonNode dataNode = new ObjectMapper().convertValue(record.getData().toJsonString(), JsonNode.class); + if (validateJson(topic.getName(), dataForDlq)) { + producer.send(new ProducerRecord(topic.getName(), key, dataNode), callBack(dataForDlq)); + } else { + dlqSink.perform(dataForDlq, new RuntimeException("Invalid Json")); + } + } + + private Boolean validateJson(final String topicName, Object dataForDlq) throws IOException, RestClientException, ProcessingException { + if (schemaRegistryClient != null) { + final String schemaJson = schemaRegistryClient. + getLatestSchemaMetadata(topicName + "-value").getSchema(); + return validateSchema(dataForDlq.toString(), schemaJson); + } else { + return true; + } + } + + private boolean validateSchema(final String jsonData, final String schemaJson) throws IOException, ProcessingException { + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode schemaNode = objectMapper.readTree(schemaJson); + JsonNode dataNode = objectMapper.readTree(jsonData); + JsonSchemaFactory schemaFactory = JsonSchemaFactory.byDefault(); + JsonSchema schema = schemaFactory.getJsonSchema(schemaNode); + ProcessingReport report = schema.validate(dataNode); + if (report.isSuccess()) { + return true; + } else { + return false; + } + } + + private Callback callBack(final Object dataForDlq) { + return (metadata, exception) -> { + if (null != exception) { + releaseEventHandles(false); + dlqSink.perform(dataForDlq, exception); + } else { + releaseEventHandles(true); + } + }; + } + + + private GenericRecord getGenericRecord(final Event event, final Schema schema) { + final GenericRecord record = new GenericData.Record(schema); + for (final String key : event.toMap().keySet()) { + record.put(key, event.toMap().get(key)); + } + return record; + } + + private void releaseEventHandles(final boolean result) { + for (final EventHandle eventHandle : bufferedEventHandles) { + eventHandle.release(result); + } + bufferedEventHandles.clear(); + } + + private Event addTagsToEvent(Event event, String tagsTargetKey) throws JsonProcessingException { + String eventJsonString = event.jsonBuilder().includeTags(tagsTargetKey).toJsonString(); + Map eventData = objectMapper.readValue(eventJsonString, new TypeReference<>() { + }); + return JacksonLog.builder().withData(eventData).build(); + } + +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorker.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorker.java new file mode 100644 index 0000000000..dbb05e7401 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorker.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.producer; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +/** + * * A Multithreaded helper class which helps to produce the records to multiple topics in an + * asynchronous way. + */ + +public class ProducerWorker implements Runnable { + + private final Record record; + private final KafkaSinkProducer producer; + + + public ProducerWorker(final KafkaSinkProducer producer, + final Record record) { + this.record = record; + this.producer = producer; + } + + @Override + public void run() { + producer.produceRecords(record); + } + +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java new file mode 100644 index 0000000000..5b393d3f19 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java @@ -0,0 +1,94 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.dataprepper.plugins.kafka.sink; + +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.failures.DlqObject; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.plugins.dlq.DlqProvider; +import org.opensearch.dataprepper.plugins.dlq.DlqWriter; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.StringJoiner; + +import static java.util.UUID.randomUUID; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE; + + +/** + * *This class which helps log failed data to AWS S3 bucket + */ + +public class DLQSink { + + private static final Logger LOG = LoggerFactory.getLogger(DLQSink.class); + + private final DlqProvider dlqProvider; + private final PluginSetting pluginSetting; + + public DLQSink(final PluginFactory pluginFactory, final KafkaSinkConfig kafkaSinkConfig, final PluginSetting pluginSetting) { + this.pluginSetting = pluginSetting; + this.dlqProvider = getDlqProvider(pluginFactory, kafkaSinkConfig); + } + + public void perform(final Object failedData, final Exception e) { + final DlqWriter dlqWriter = getDlqWriter(); + final DlqObject dlqObject = DlqObject.builder() + .withPluginId(randomUUID().toString()) + .withPluginName(pluginSetting.getName()) + .withPipelineName(pluginSetting.getPipelineName()) + .withFailedData(failedData) + .build(); + logFailureForDlqObjects(dlqWriter, List.of(dlqObject), e); + } + + private DlqWriter getDlqWriter() { + final Optional potentialDlq = dlqProvider.getDlqWriter(new StringJoiner(MetricNames.DELIMITER) + .add(pluginSetting.getPipelineName()) + .add(pluginSetting.getName()).toString()); + final DlqWriter dlqWriter = potentialDlq.isPresent() ? potentialDlq.get() : null; + return dlqWriter; + } + + private DlqProvider getDlqProvider(final PluginFactory pluginFactory, final KafkaSinkConfig kafkaSinkConfig) { + final Map props = new HashMap<>(); + kafkaSinkConfig.setDlqConfig(pluginSetting); + final Optional dlq = kafkaSinkConfig.getDlq(); + if (dlq.isPresent()) { + final PluginModel dlqPluginModel = dlq.get(); + final PluginSetting dlqPluginSetting = new PluginSetting(dlqPluginModel.getPluginName(), dlqPluginModel.getPluginSettings()); + dlqPluginSetting.setPipelineName(pluginSetting.getPipelineName()); + final DlqProvider dlqProvider = pluginFactory.loadPlugin(DlqProvider.class, dlqPluginSetting); + return dlqProvider; + } + return null; + } + + private void logFailureForDlqObjects(final DlqWriter dlqWriter, final List dlqObjects, final Throwable failure) { + try { + dlqWriter.write(dlqObjects, pluginSetting.getPipelineName(), pluginSetting.getName()); + dlqObjects.forEach((dlqObject) -> { + dlqObject.releaseEventHandle(true); + }); + } catch (final IOException e) { + dlqObjects.forEach(dlqObject -> { + LOG.error(SENSITIVE, "DLQ failure for Document[{}]", dlqObject.getFailedData(), e); + dlqObject.releaseEventHandle(false); + }); + } + } +} + diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java new file mode 100644 index 0000000000..4a530e2794 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java @@ -0,0 +1,166 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.sink; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.AbstractSink; +import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; +import org.opensearch.dataprepper.plugins.kafka.producer.KafkaSinkProducer; +import org.opensearch.dataprepper.plugins.kafka.producer.ProducerWorker; +import org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Implementation class of kafka--sink plugin. It is responsible for receive the collection of + * {@link Event} and produce it to different kafka topics. + */ +@DataPrepperPlugin(name = "kafka", pluginType = Sink.class, pluginConfigurationType = KafkaSinkConfig.class) +public class KafkaSink extends AbstractSink> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); + + private final KafkaSinkConfig kafkaSinkConfig; + + private volatile boolean sinkInitialized; + + private static final Integer totalWorkers = 1; + + private ProducerWorker producerWorker; + + private ExecutorService executorService; + + private final PluginFactory pluginFactory; + + private final PluginSetting pluginSetting; + + private final ExpressionEvaluator expressionEvaluator; + + private final SinkContext sinkContext; + + + @DataPrepperPluginConstructor + public KafkaSink(final PluginSetting pluginSetting, final KafkaSinkConfig kafkaSinkConfig, final PluginFactory pluginFactory, + final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext) { + super(pluginSetting); + this.pluginSetting = pluginSetting; + this.kafkaSinkConfig = kafkaSinkConfig; + this.pluginFactory = pluginFactory; + this.expressionEvaluator = expressionEvaluator; + this.sinkContext = sinkContext; + + + } + + + @Override + public void doInitialize() { + try { + doInitializeInternal(); + } catch (InvalidPluginConfigurationException e) { + LOG.error("Invalid plugin configuration, Hence failed to initialize kafka-sink plugin."); + this.shutdown(); + throw e; + } catch (Exception e) { + LOG.error("Failed to initialize kafka-sink plugin."); + this.shutdown(); + throw e; + } + } + + private void doInitializeInternal() { + executorService = Executors.newFixedThreadPool(totalWorkers); + sinkInitialized = Boolean.TRUE; + } + + @Override + public void doOutput(Collection> records) { + if (records.isEmpty()) { + return; + } + try { + final KafkaSinkProducer producer = createProducer(); + records.forEach(record -> { + producerWorker = new ProducerWorker(producer, record); + executorService.submit(producerWorker); + }); + + } catch (Exception e) { + LOG.error("Failed to setup the Kafka sink Plugin.", e); + throw new RuntimeException(e.getMessage()); + } + } + + public KafkaSinkProducer createProducer() { + Properties properties = SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig); + properties = Objects.requireNonNull(properties); + return new KafkaSinkProducer(new KafkaProducer<>(properties), + kafkaSinkConfig, new DLQSink(pluginFactory, kafkaSinkConfig, pluginSetting), getSchemaRegistryClient(), expressionEvaluator + , Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null); + } + + private CachedSchemaRegistryClient getSchemaRegistryClient() { + if (kafkaSinkConfig.getSchemaConfig() != null && kafkaSinkConfig.getSchemaConfig().getRegistryURL() != null) { + Properties schemaProps = new Properties(); + SinkPropertyConfigurer.setSchemaProps(kafkaSinkConfig, schemaProps); + Map propertiesMap = schemaProps; + return new CachedSchemaRegistryClient( + kafkaSinkConfig.getSchemaConfig().getRegistryURL(), + 100, propertiesMap); + } + return null; + } + + @Override + public void shutdown() { + try { + if (!executorService.awaitTermination( + calculateLongestThreadWaitingTime(), TimeUnit.MILLISECONDS)) { + executorService.shutdownNow(); + } + LOG.info("Sink threads are waiting for shutting down..."); + } catch (InterruptedException e) { + if (e.getCause() instanceof InterruptedException) { + LOG.error("Interrupted during sink shutdown, exiting uncleanly..."); + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + super.shutdown(); + LOG.info("Producer shutdown successfully..."); + } + + private long calculateLongestThreadWaitingTime() { + return kafkaSinkConfig.getThreadWaitTime(); + } + + + @Override + public boolean isReady() { + return sinkInitialized; + } + +} + diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurer.java new file mode 100644 index 0000000000..97b9951b97 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurer.java @@ -0,0 +1,144 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.kafka.util; + +import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; + +import java.util.Base64; +import java.util.Properties; + +/** + * * This is static property configurer dedicated to authencation related information given in pipeline.yml + */ + +public class AuthenticationPropertyConfigurer { + + private static final String SASL_MECHANISM = "sasl.mechanism"; + + private static final String SASL_SECURITY_PROTOCOL = "security.protocol"; + + private static final String SASL_JAS_CONFIG = "sasl.jaas.config"; + + private static final String SASL_CALLBACK_HANDLER_CLASS = "sasl.login.callback.handler.class"; + + private static final String SASL_JWKS_ENDPOINT_URL = "sasl.oauthbearer.jwks.endpoint.url"; + + private static final String SASL_TOKEN_ENDPOINT_URL = "sasl.oauthbearer.token.endpoint.url"; + + private static final String PLAINTEXT_JAASCONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username= \"%s\" password= " + + " \"%s\";"; + private static final String OAUTH_JAASCONFIG = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId='" + + "%s" + "' clientSecret='" + "%s" + "' scope='" + "%s" + "' OAUTH_LOGIN_SERVER='" + "%s" + + "' OAUTH_LOGIN_ENDPOINT='" + "%s" + "' OAUT_LOGIN_GRANT_TYPE=" + "%s" + + " OAUTH_LOGIN_SCOPE=%s OAUTH_AUTHORIZATION='Basic " + "%s" + "';"; + + private static final String INSTROSPECT_SERVER_PROPERTIES = " OAUTH_INTROSPECT_SERVER='" + + "%s" + "' OAUTH_INTROSPECT_ENDPOINT='" + "%s" + "' " + + "OAUTH_INTROSPECT_AUTHORIZATION='Basic " + "%s"; + + private static final String PLAIN_MECHANISM = "PLAIN"; + + private static final String SASL_PLAINTEXT_PROTOCOL = "SASL_PLAINTEXT"; + + private static final String SASL_SSL_PROTOCOL = "SASL_SSL"; + + private static final String SSL_ENDPOINT_IDENTIFICATION = "ssl.endpoint.identification.algorithm"; + + private static final String USER_INFO = "USER_INFO"; + + + public static void setSaslPlainTextProperties(final PlainTextAuthConfig plainTextAuthConfig, + final Properties properties) { + + String username = plainTextAuthConfig.getUsername(); + String password = plainTextAuthConfig.getPassword(); + properties.put(SASL_MECHANISM, PLAIN_MECHANISM); + properties.put(SASL_JAS_CONFIG, String.format(PLAINTEXT_JAASCONFIG, username, password)); + properties.put(SASL_SECURITY_PROTOCOL, SASL_PLAINTEXT_PROTOCOL); + } + + public static void setOauthProperties(final OAuthConfig oAuthConfig, + final Properties properties) { + final String oauthClientId = oAuthConfig.getOauthClientId(); + final String oauthClientSecret = oAuthConfig.getOauthClientSecret(); + final String oauthLoginServer = oAuthConfig.getOauthLoginServer(); + final String oauthLoginEndpoint = oAuthConfig.getOauthLoginEndpoint(); + final String oauthLoginGrantType = oAuthConfig.getOauthLoginGrantType(); + final String oauthLoginScope = oAuthConfig.getOauthLoginScope(); + final String oauthAuthorizationToken = Base64.getEncoder().encodeToString((oauthClientId + ":" + oauthClientSecret).getBytes()); + final String oauthIntrospectEndpoint = oAuthConfig.getOauthIntrospectEndpoint(); + final String tokenEndPointURL = oAuthConfig.getOauthTokenEndpointURL(); + final String saslMechanism = oAuthConfig.getOauthSaslMechanism(); + final String securityProtocol = oAuthConfig.getOauthSecurityProtocol(); + final String loginCallBackHandler = oAuthConfig.getOauthSaslLoginCallbackHandlerClass(); + final String oauthJwksEndpointURL = oAuthConfig.getOauthJwksEndpointURL(); + final String introspectServer = oAuthConfig.getOauthIntrospectServer(); + final String extensionLogicalCluster = oAuthConfig.getExtensionLogicalCluster(); + final String extensionIdentityPoolId = oAuthConfig.getExtensionIdentityPoolId(); + + properties.put(SASL_MECHANISM, saslMechanism); + properties.put(SASL_SECURITY_PROTOCOL, securityProtocol); + properties.put(SASL_TOKEN_ENDPOINT_URL, tokenEndPointURL); + properties.put(SASL_CALLBACK_HANDLER_CLASS, loginCallBackHandler); + + populateJwksEndpoint(properties, oauthJwksEndpointURL); + String instrospect_properties = getInstrospectProperties(oauthAuthorizationToken, oauthIntrospectEndpoint, oauthJwksEndpointURL, introspectServer); + String jass_config = createJassConfig(oauthClientId, oauthClientSecret, oauthLoginServer, + oauthLoginEndpoint, oauthLoginGrantType, oauthLoginScope, oauthAuthorizationToken, + extensionLogicalCluster, extensionIdentityPoolId, instrospect_properties); + + properties.put(SASL_JAS_CONFIG, jass_config); + } + + private static void populateJwksEndpoint(final Properties properties, final String oauthJwksEndpointURL) { + if (oauthJwksEndpointURL != null && !oauthJwksEndpointURL.isEmpty() && !oauthJwksEndpointURL.isBlank()) { + properties.put(SASL_JWKS_ENDPOINT_URL, oauthJwksEndpointURL); + } + } + + private static String getInstrospectProperties(final String oauthAuthorizationToken, final String oauthIntrospectEndpoint, + final String oauthJwksEndpointURL, final String introspectServer) { + String instrospect_properties = ""; + if (oauthJwksEndpointURL != null && !oauthIntrospectEndpoint.isBlank() && !oauthIntrospectEndpoint.isEmpty()) { + instrospect_properties = String.format(INSTROSPECT_SERVER_PROPERTIES, introspectServer, oauthIntrospectEndpoint, oauthAuthorizationToken); + } + return instrospect_properties; + } + + private static String createJassConfig(final String oauthClientId, final String oauthClientSecret, final String oauthLoginServer, + final String oauthLoginEndpoint, final String oauthLoginGrantType, final String oauthLoginScope, + final String oauthAuthorizationToken, final String extensionLogicalCluster, + final String extensionIdentityPoolId, final String instrospect_properties) { + String jass_config = String.format(OAUTH_JAASCONFIG, oauthClientId, oauthClientSecret, oauthLoginScope, oauthLoginServer, + oauthLoginEndpoint, oauthLoginGrantType, oauthLoginScope, oauthAuthorizationToken, instrospect_properties); + + jass_config = getJassConfigWithClusterInforation(extensionLogicalCluster, extensionIdentityPoolId, jass_config); + return jass_config; + } + + private static String getJassConfigWithClusterInforation(final String extensionLogicalCluster, final String extensionIdentityPoolId, + String jass_config) { + if (extensionLogicalCluster != null && extensionIdentityPoolId != null) { + String extensionValue = "extension_logicalCluster= \"%s\" extension_identityPoolId=" + " \"%s\";"; + jass_config = jass_config.replace(";", " "); + jass_config = jass_config + String.format(extensionValue, extensionLogicalCluster, extensionIdentityPoolId); + } + return jass_config; + } + + public static void setSaslPlainProperties(final PlainTextAuthConfig plainTextAuthConfig, + final Properties properties) { + + String username = plainTextAuthConfig.getUsername(); + String password = plainTextAuthConfig.getPassword(); + properties.put(SASL_MECHANISM, PLAIN_MECHANISM); + properties.put(SASL_JAS_CONFIG, String.format(PLAINTEXT_JAASCONFIG, username, password)); + properties.put(SASL_SECURITY_PROTOCOL, SASL_SSL_PROTOCOL); + + } + +} + diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java new file mode 100644 index 0000000000..8484335fce --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java @@ -0,0 +1,323 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.util; + + +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.connect.json.JsonSerializer; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Properties; + +/** + * * This is static property configurer for related information given in pipeline.yml + */ +public class SinkPropertyConfigurer { + + private static final Logger LOG = LoggerFactory.getLogger(SinkPropertyConfigurer.class); + + private static final String VALUE_SERIALIZER = "value.serializer"; + + private static final String KEY_SERIALIZER = "key.serializer"; + + private static final String SESSION_TIMEOUT_MS_CONFIG = "30000"; + + private static final String REGISTRY_URL = "schema.registry.url"; + + private static final String REGISTRY_BASIC_AUTH_USER_INFO = "schema.registry.basic.auth.user.info"; + + private static final String CREDENTIALS_SOURCE = "basic.auth.credentials.source"; + + public static final String BUFFER_MEMORY = "buffer.memory"; + + public static final String COMPRESSION_TYPE = "compression.type"; + + public static final String RETRIES = "retries"; + + public static final String BATCH_SIZE = "batch.size"; + + public static final String CLIENT_DNS_LOOKUP = "client.dns.lookup"; + + public static final String CLIENT_ID = "client.id"; + + public static final String CONNECTIONS_MAX_IDLE_MS = "connections.max.idle.ms"; + + public static final String DELIVERY_TIMEOUT_MS = "delivery.timeout.ms"; + + public static final String LINGER_MS = "linger.ms"; + + public static final String MAX_BLOCK_MS = "max.block.ms"; + + public static final String MAX_REQUEST_SIZE = "max.request.size"; + + public static final String PARTITIONER_CLASS = "partitioner.class"; + + public static final String PARTITIONER_IGNORE_KEYS = "partitioner.ignore.keys"; + + public static final String RECEIVE_BUFFER_BYTES = "receive.buffer.bytes"; + + public static final String REQUEST_TIMEOUT_MS = "request.timeout.ms"; + + public static final String SEND_BUFFER_BYTES = "send.buffer.bytes"; + + public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS = "socket.connection.setup.timeout.max.ms"; + + public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS = "socket.connection.setup.timeout.ms"; + + public static final String ACKS = "acks"; + + public static final String ENABLE_IDEMPOTENCE = "enable.idempotence"; + + public static final String INTERCEPTOR_CLASSES = "interceptor.classes"; + + public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; + + public static final String METADATA_MAX_AGE_MS = "metadata.max.age.ms"; + + public static final String METADATA_MAX_IDLE_MS = "metadata.max.idle.ms"; + + public static final String METRIC_REPORTERS = "metric.reporters"; + + public static final String METRICS_NUM_SAMPLES = "metrics.num.samples"; + + public static final String METRICS_RECORDING_LEVEL = "metrics.recording.level"; + + public static final String METRICS_SAMPLE_WINDOW_MS = "metrics.sample.window.ms"; + + public static final String PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE = "partitioner.adaptive.partitioning.enable"; + + public static final String PARTITIONER_AVAILABILITY_TIMEOUT_MS = "partitioner.availability.timeout.ms"; + + public static final String RECONNECT_BACKOFF_MAX_MS = "reconnect.backoff.max.ms"; + + public static final String RECONNECT_BACKOFF_MS = "reconnect.backoff.ms"; + + public static final String RETRY_BACKOFF_MS = "retry.backoff.ms"; + + public static Properties getProducerProperties(final KafkaSinkConfig kafkaSinkConfig) { + final Properties properties = new Properties(); + + setCommonServerProperties(properties, kafkaSinkConfig); + + setPropertiesForSerializer(properties, kafkaSinkConfig.getSerdeFormat()); + + if (kafkaSinkConfig.getSchemaConfig() != null) { + setSchemaProps(kafkaSinkConfig, properties); + } + if (kafkaSinkConfig.getKafkaProducerProperties() != null) { + setPropertiesProviderByKafkaProducer(kafkaSinkConfig.getKafkaProducerProperties(), properties); + } + + setAuthProperties(kafkaSinkConfig, properties); + + return properties; + } + + private static void setAuthProperties(final KafkaSinkConfig kafkaSinkConfig, final Properties properties) { + if (kafkaSinkConfig.getAuthConfig().getSaslAuthConfig().getPlainTextAuthConfig() != null) { + final String sslEndpointAlgorithm = kafkaSinkConfig.getAuthConfig().getSaslAuthConfig().getSslEndpointAlgorithm(); + if (null != sslEndpointAlgorithm && !sslEndpointAlgorithm.isEmpty() && sslEndpointAlgorithm.equalsIgnoreCase("https")) { + AuthenticationPropertyConfigurer.setSaslPlainProperties(kafkaSinkConfig.getAuthConfig().getSaslAuthConfig().getPlainTextAuthConfig(), properties); + } else { + AuthenticationPropertyConfigurer.setSaslPlainTextProperties(kafkaSinkConfig.getAuthConfig() + .getSaslAuthConfig().getPlainTextAuthConfig(), properties); + } + + } else if (kafkaSinkConfig.getAuthConfig().getSaslAuthConfig().getOAuthConfig() != null) { + AuthenticationPropertyConfigurer.setOauthProperties(kafkaSinkConfig.getAuthConfig().getSaslAuthConfig() + .getOAuthConfig(), properties); + } + + + } + + private static void setCommonServerProperties(final Properties properties, final KafkaSinkConfig kafkaSinkConfig) { + properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaSinkConfig.getBootStrapServers()); + properties.put(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG, SESSION_TIMEOUT_MS_CONFIG); + } + + private static void setPropertiesForSerializer(final Properties properties, final String serdeFormat) { + properties.put(KEY_SERIALIZER, StringSerializer.class.getName()); + if (serdeFormat.equalsIgnoreCase(MessageFormat.JSON.toString())) { + properties.put(VALUE_SERIALIZER, JsonSerializer.class.getName()); + } else if (serdeFormat.equalsIgnoreCase(MessageFormat.AVRO.toString())) { + properties.put(VALUE_SERIALIZER, KafkaAvroSerializer.class.getName()); + } else { + properties.put(VALUE_SERIALIZER, StringSerializer.class.getName()); + } + } + + private static void validateForRegistryURL(final KafkaSinkConfig kafkaSinkConfig) { + String serdeFormat = kafkaSinkConfig.getSerdeFormat(); + if (serdeFormat.equalsIgnoreCase(MessageFormat.AVRO.toString())) { + if (kafkaSinkConfig.getSchemaConfig() == null || kafkaSinkConfig.getSchemaConfig().getRegistryURL() == null || + kafkaSinkConfig.getSchemaConfig().getRegistryURL().isBlank() || kafkaSinkConfig.getSchemaConfig().getRegistryURL().isEmpty()) { + throw new RuntimeException("Schema registry is mandatory when serde type is avro"); + } + } + if (serdeFormat.equalsIgnoreCase(MessageFormat.PLAINTEXT.toString())) { + if (kafkaSinkConfig.getSchemaConfig() != null && + kafkaSinkConfig.getSchemaConfig().getRegistryURL() != null) { + throw new RuntimeException("Schema registry is not required for type plaintext"); + } + } + } + + public static void setSchemaProps(final KafkaSinkConfig kafkaSinkConfig, final Properties properties) { + validateForRegistryURL(kafkaSinkConfig); + SchemaConfig schemaConfig = kafkaSinkConfig.getSchemaConfig(); + final String registryURL = schemaConfig != null ? schemaConfig.getRegistryURL() : null; + if (registryURL != null && !registryURL.isEmpty()) { + properties.put(REGISTRY_URL, registryURL); + } + if (!ObjectUtils.isEmpty(schemaConfig.getBasicAuthCredentialsSource())) { + properties.put(CREDENTIALS_SOURCE, schemaConfig.getBasicAuthCredentialsSource()); + } + if (!ObjectUtils.isEmpty(schemaConfig.getSchemaRegistryApiKey()) && !(ObjectUtils.isEmpty(schemaConfig.getSchemaRegistryApiSecret()))) { + final String apiKey = schemaConfig.getSchemaRegistryApiKey(); + final String apiSecret = schemaConfig.getSchemaRegistryApiSecret(); + properties.put(REGISTRY_BASIC_AUTH_USER_INFO, apiKey + ":" + apiSecret); + } + } + + private static void setPropertiesProviderByKafkaProducer(final KafkaProducerProperties producerProperties, final + Properties properties) { + + if (producerProperties.getBufferMemory() != null) { + properties.put(BUFFER_MEMORY, ByteCount.parse(producerProperties.getBufferMemory()).getBytes()); + } + if (producerProperties.getCompressionType() != null) { + properties.put(COMPRESSION_TYPE, producerProperties.getCompressionType()); + } + properties.put(RETRIES, producerProperties.getRetries()); + + if (producerProperties.getBatchSize() > 0) { + properties.put(BATCH_SIZE, producerProperties.getBatchSize()); + } + if (producerProperties.getClientDnsLookup() != null) { + properties.put(CLIENT_DNS_LOOKUP, producerProperties.getClientDnsLookup()); + } + if (producerProperties.getClientId() != null) { + properties.put(CLIENT_ID, producerProperties.getClientId()); + } + if (producerProperties.getConnectionsMaxIdleMs() > 0) { + properties.put(CONNECTIONS_MAX_IDLE_MS, producerProperties.getConnectionsMaxIdleMs()); + } + if (producerProperties.getDeliveryTimeoutMs() > 0) { + properties.put(DELIVERY_TIMEOUT_MS, producerProperties.getDeliveryTimeoutMs().intValue()); + } + if (producerProperties.getLingerMs() > 0) { + properties.put(LINGER_MS, (producerProperties.getLingerMs())); + } + if (producerProperties.getMaxBlockMs() > 0) { + properties.put(MAX_BLOCK_MS, producerProperties.getMaxBlockMs()); + } + if (producerProperties.getMaxRequestSize() > 0) { + properties.put(MAX_REQUEST_SIZE, producerProperties.getMaxRequestSize()); + } + if (producerProperties.getPartitionerClass() != null) { + properties.put(PARTITIONER_CLASS, producerProperties.getPartitionerClass().getName()); + } + if (producerProperties.getPartitionerIgnoreKeys() != null) { + properties.put(PARTITIONER_IGNORE_KEYS, producerProperties.getPartitionerIgnoreKeys()); + } + if (producerProperties.getReceiveBufferBytes() != null) { + final Long receiveBufferBytes = ByteCount.parse(producerProperties.getReceiveBufferBytes()).getBytes(); + properties.put(RECEIVE_BUFFER_BYTES, receiveBufferBytes.intValue()); + } + if (producerProperties.getRequestTimeoutMs() > 0) { + properties.put(REQUEST_TIMEOUT_MS, producerProperties.getRequestTimeoutMs().intValue()); + } + if (producerProperties.getSendBufferBytes() != null) { + final Long sendBufferBytes = ByteCount.parse(producerProperties.getSendBufferBytes()).getBytes(); + properties.put(SEND_BUFFER_BYTES, sendBufferBytes.intValue()); + } + if (producerProperties.getSocketConnectionSetupMaxTimeout() > 0) { + properties.put(SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, producerProperties.getSocketConnectionSetupMaxTimeout()); + } + if (producerProperties.getSocketConnectionSetupTimeout() > 0) { + properties.put(SOCKET_CONNECTION_SETUP_TIMEOUT_MS, producerProperties.getSocketConnectionSetupTimeout()); + } + if (producerProperties.getAcks() != null) { + properties.put(ACKS, producerProperties.getAcks()); + } + if (producerProperties.getEnableIdempotence() != null) { + properties.put(ENABLE_IDEMPOTENCE, producerProperties.getEnableIdempotence()); + } + + + List interceptorClasses = producerProperties.getInterceptorClasses(); + if (interceptorClasses != null && !interceptorClasses.isEmpty()) { + properties.put(INTERCEPTOR_CLASSES, String.join(",", interceptorClasses)); + } + + if (producerProperties.getMaxInFlightRequestsPerConnection() > 0) { + properties.put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, producerProperties.getMaxInFlightRequestsPerConnection()); + } + + if (producerProperties.getMetadataMaxAgeMs() > 0) { + properties.put(METADATA_MAX_AGE_MS, producerProperties.getMetadataMaxAgeMs()); + } + + if (producerProperties.getMetadataMaxIdleMs() > 0) { + properties.put(METADATA_MAX_IDLE_MS, producerProperties.getMetadataMaxIdleMs()); + } + + + List metricReporters = producerProperties.getMetricReporters(); + if (metricReporters != null && !metricReporters.isEmpty()) { + properties.put(METRIC_REPORTERS, String.join(",", metricReporters)); + } + + if (producerProperties.getMetricsNumSamples() > 0) { + properties.put(METRICS_NUM_SAMPLES, producerProperties.getMetricsNumSamples()); + } + + if (producerProperties.getMetricsRecordingLevel() != null) { + properties.put(METRICS_RECORDING_LEVEL, producerProperties.getMetricsRecordingLevel()); + } + + if (producerProperties.getMetricsSampleWindowMs() > 0) { + properties.put(METRICS_SAMPLE_WINDOW_MS, producerProperties.getMetricsSampleWindowMs()); + } + + properties.put(PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE, producerProperties.isPartitionerAdaptivePartitioningEnable()); + + if (producerProperties.getPartitionerAvailabilityTimeoutMs() > 0) { + properties.put(PARTITIONER_AVAILABILITY_TIMEOUT_MS, producerProperties.getPartitionerAvailabilityTimeoutMs()); + } + + if (producerProperties.getReconnectBackoffMaxMs() > 0) { + properties.put(RECONNECT_BACKOFF_MAX_MS, producerProperties.getReconnectBackoffMaxMs()); + } + + if (producerProperties.getReconnectBackoffMs() > 0) { + properties.put(RECONNECT_BACKOFF_MS, producerProperties.getReconnectBackoffMs()); + } + + if (producerProperties.getRetryBackoffMs() > 0) { + properties.put(RETRY_BACKOFF_MS, producerProperties.getRetryBackoffMs()); + } + + + LOG.info("Producer properties"); + properties.entrySet().forEach(prop -> { + LOG.info("property " + prop.getKey() + " value" + prop.getValue()); + }); + + LOG.info("Producer properties ends"); + } + +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java new file mode 100644 index 0000000000..de3308a1f0 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java @@ -0,0 +1,106 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.configuration.PluginSetting; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + + +class KafkaSinkConfigTest { + + KafkaSinkConfig kafkaSinkConfig; + + List bootstrapServers; + + @BeforeEach + void setUp() { + kafkaSinkConfig = new KafkaSinkConfig(); + kafkaSinkConfig.setBootStrapServers(Arrays.asList("127.0.0.1:9093")); + kafkaSinkConfig.setAuthConfig(mock(AuthConfig.class)); + kafkaSinkConfig.setTopic(mock(TopicConfig.class)); + kafkaSinkConfig.setSchemaConfig((mock(SchemaConfig.class))); + kafkaSinkConfig.setThreadWaitTime(10L); + // kafkaSinkConfig.setSerdeFormat("JSON"); + + } + + @Test + void test_kafka_config_not_null() { + assertThat(kafkaSinkConfig, notNullValue()); + } + + @Test + void test_bootStrapServers_not_null() { + assertThat(kafkaSinkConfig.getBootStrapServers(), notNullValue()); + List servers = kafkaSinkConfig.getBootStrapServers(); + bootstrapServers = servers.stream(). + flatMap(str -> Arrays.stream(str.split(","))). + map(String::trim). + collect(Collectors.toList()); + assertThat(bootstrapServers.size(), equalTo(1)); + assertThat(bootstrapServers, hasItem("127.0.0.1:9093")); + } + + @Test + void test_topics_not_null() { + assertThat(kafkaSinkConfig.getTopic(), notNullValue()); + } + + @Test + void test_schema_not_null() { + assertThat(kafkaSinkConfig.getSchemaConfig(), notNullValue()); + } + + @Test + void test_authentication_not_null() { + assertThat(kafkaSinkConfig.getAuthConfig(), notNullValue()); + } + + @Test + void test_serde_format_not_null() { + assertThat(kafkaSinkConfig.getSerdeFormat(), notNullValue()); + } + + @Test + void test_thread_wait_time_null() { + assertThat(kafkaSinkConfig.getThreadWaitTime(), notNullValue()); + } + + @Test + public void testDLQConfiguration() { + final Map fakePlugin = new LinkedHashMap<>(); + final Map lowLevelPluginSettings = new HashMap<>(); + lowLevelPluginSettings.put("field1", "value1"); + lowLevelPluginSettings.put("field2", "value2"); + fakePlugin.put("another_dlq", lowLevelPluginSettings); + kafkaSinkConfig.setDlqConfig(generatePluginSetting(fakePlugin)); + assertEquals("another_dlq", kafkaSinkConfig.getDlq().get().getPluginName()); + } + + private PluginSetting generatePluginSetting(final Map pluginSettings) { + final Map metadata = new HashMap<>(); + if (pluginSettings != null) { + metadata.put(KafkaSinkConfig.DLQ, pluginSettings); + } + + return new PluginSetting("S3", metadata); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java new file mode 100644 index 0000000000..1d28b5d50e --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java @@ -0,0 +1,150 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.producer; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaMetadata; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.connect.json.JsonSerializer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) + +public class KafkaSinkProducerTest { + + private KafkaSinkProducer producer; + + @Mock + private KafkaSinkConfig kafkaSinkConfig; + + private Record record; + + KafkaSinkProducer sinkProducer; + + @Mock + private DLQSink dlqSink; + + private Event event; + + @Mock + private CachedSchemaRegistryClient cachedSchemaRegistryClient; + + final String tag = "tag"; + + @BeforeEach + public void setUp() { + event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + record = new Record<>(event); + final TopicConfig topicConfig = new TopicConfig(); + topicConfig.setName("test-topic"); + + when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig); + when(kafkaSinkConfig.getSchemaConfig()).thenReturn(mock(SchemaConfig.class)); + when(kafkaSinkConfig.getSchemaConfig().getRegistryURL()).thenReturn("http://localhost:8085/"); + when(kafkaSinkConfig.getPartitionKey()).thenReturn("testkey"); + + } + + @Test + public void producePlainTextRecordsTest() throws ExecutionException, InterruptedException { + when(kafkaSinkConfig.getSerdeFormat()).thenReturn("plaintext"); + MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer()); + producer = new KafkaSinkProducer(mockProducer, kafkaSinkConfig, dlqSink, cachedSchemaRegistryClient, mock(ExpressionEvaluator.class), tag); + sinkProducer = spy(producer); + sinkProducer.produceRecords(record); + verify(sinkProducer).produceRecords(record); + assertEquals(1, mockProducer.history().size()); + + } + + @Test + public void produceJsonRecordsTest() throws RestClientException, IOException { + when(kafkaSinkConfig.getSerdeFormat()).thenReturn("json"); + MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new JsonSerializer()); + producer = new KafkaSinkProducer(mockProducer, kafkaSinkConfig, dlqSink, cachedSchemaRegistryClient, mock(ExpressionEvaluator.class), tag); + SchemaMetadata schemaMetadata = mock(SchemaMetadata.class); + String jsonSchema = "{\n" + + " \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n" + + " \"type\": \"object\",\n" + + " \"properties\": {\n" + + " \"message\": {\n" + + " \"type\": \"string\"\n" + + " }\n" + + "}\n" + + "}\n"; + + when(schemaMetadata.getSchema()).thenReturn(jsonSchema); + when(cachedSchemaRegistryClient.getLatestSchemaMetadata(kafkaSinkConfig.getTopic().getName() + "-value")).thenReturn(schemaMetadata); + sinkProducer = spy(producer); + sinkProducer.produceRecords(record); + verify(sinkProducer).produceRecords(record); + assertEquals(1, mockProducer.history().size()); + } + + @Test + public void produceAvroRecordsTest() throws Exception { + when(kafkaSinkConfig.getSerdeFormat()).thenReturn("avro"); + MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer()); + producer = new KafkaSinkProducer(mockProducer, kafkaSinkConfig, dlqSink, cachedSchemaRegistryClient, mock(ExpressionEvaluator.class), tag); + SchemaMetadata schemaMetadata = mock(SchemaMetadata.class); + String avroSchema = "{\"type\":\"record\",\"name\":\"MyMessage\",\"fields\":[{\"name\":\"message\",\"type\":\"string\"}]}"; + when(schemaMetadata.getSchema()).thenReturn(avroSchema); + when(cachedSchemaRegistryClient.getLatestSchemaMetadata(kafkaSinkConfig.getTopic().getName() + "-value")).thenReturn(schemaMetadata); + sinkProducer = spy(producer); + sinkProducer.produceRecords(record); + verify(sinkProducer).produceRecords(record); + + } + + @Test + public void testGetGenericRecord() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + producer = new KafkaSinkProducer(new MockProducer(), kafkaSinkConfig, dlqSink, cachedSchemaRegistryClient, mock(ExpressionEvaluator.class), tag); + final Schema schema = createMockSchema(); + Method privateMethod = KafkaSinkProducer.class.getDeclaredMethod("getGenericRecord", Event.class, Schema.class); + privateMethod.setAccessible(true); + GenericRecord result = (GenericRecord) privateMethod.invoke(producer, event, schema); + Assertions.assertNotNull(result); + } + + private Schema createMockSchema() { + String schemaDefinition = "{\"type\":\"record\",\"name\":\"MyRecord\",\"fields\":[{\"name\":\"message\",\"type\":\"string\"}]}"; + return new Schema.Parser().parse(schemaDefinition); + } +} + diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorkerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorkerTest.java new file mode 100644 index 0000000000..65492af774 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorkerTest.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.dataprepper.plugins.kafka.producer; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + + +class ProducerWorkerTest { + + @Mock + ProducerWorker producerWorker; + + + private Record record; + + + @BeforeEach + public void setUp() { + Event event = JacksonEvent.fromMessage("Testing multithreaded producer"); + record = new Record<>(event); + } + + + private ProducerWorker createObjectUnderTest() { + return new ProducerWorker(mock(KafkaSinkProducer.class), record); + } + + @Test + void testWritingToTopic() { + producerWorker = createObjectUnderTest(); + Thread spySink = spy(new Thread(producerWorker)); + spySink.start(); + verify(spySink).start(); + } + + +} \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSinkTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSinkTest.java new file mode 100644 index 0000000000..09f697cfda --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSinkTest.java @@ -0,0 +1,96 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.sink; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.plugins.dlq.DlqProvider; +import org.opensearch.dataprepper.plugins.dlq.DlqWriter; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; +import org.springframework.test.util.ReflectionTestUtils; +import org.yaml.snakeyaml.Yaml; + +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class DLQSinkTest { + + @Mock + private PluginFactory pluginFactory; + + + private KafkaSinkConfig kafkaSinkConfig; + + @Mock + private DlqProvider dlqProvider; + + @Mock + private DlqWriter dlqWriter; + + + private PluginSetting pluginSetting; + + private DLQSink dlqSink; + + @BeforeEach + public void setUp() throws IOException { + MockitoAnnotations.initMocks(this); + when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider); + when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter)); + pluginSetting = new PluginSetting("kafka-sink", new HashMap<>()); + + + Yaml yaml = new Yaml(); + FileReader fileReader = new FileReader(getClass().getClassLoader().getResource("sample-pipelines-sink.yaml").getFile()); + Object data = yaml.load(fileReader); + if (data instanceof Map) { + Map propertyMap = (Map) data; + Map logPipelineMap = (Map) propertyMap.get("log-pipeline"); + Map sinkeMap = (Map) logPipelineMap.get("sink"); + Map kafkaConfigMap = (Map) sinkeMap.get("kafka"); + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + String json = mapper.writeValueAsString(kafkaConfigMap); + Reader reader = new StringReader(json); + kafkaSinkConfig = mapper.readValue(reader, KafkaSinkConfig.class); + + + } + dlqSink = new DLQSink(pluginFactory, kafkaSinkConfig, pluginSetting); + } + + @Test + public void testPerform() throws IOException { + Object failedData = new Object(); + ReflectionTestUtils.setField(pluginSetting, "pipelineName", "test"); + doNothing().when(dlqWriter).write(anyList(), anyString(), anyString()); + dlqSink.perform(failedData, mock(Exception.class)); + verify(dlqWriter).write(anyList(), anyString(), anyString()); + } + + +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkasinkTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkasinkTest.java new file mode 100644 index 0000000000..4f12f11c87 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkasinkTest.java @@ -0,0 +1,191 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.dataprepper.plugins.kafka.sink; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; +import org.opensearch.dataprepper.plugins.kafka.producer.ProducerWorker; +import org.springframework.test.util.ReflectionTestUtils; +import org.yaml.snakeyaml.Yaml; + +import java.io.FileReader; +import java.io.Reader; +import java.io.StringReader; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class KafkasinkTest { + + + KafkaSink kafkaSink; + + + KafkaSinkConfig kafkaSinkConfig; + + + ExecutorService executorService; + + @Mock + PluginSetting pluginSetting; + + @Mock + FutureTask futureTask; + + + Event event; + + KafkaSink spySink; + + private static final Integer totalWorkers = 1; + + MockedStatic executorsMockedStatic; + + @Mock + private PluginFactory pluginFactoryMock; + + Properties props; + + @Mock + SinkContext sinkContext; + + + @BeforeEach + void setUp() throws Exception { + Yaml yaml = new Yaml(); + FileReader fileReader = new FileReader(getClass().getClassLoader().getResource("sample-pipelines-sink.yaml").getFile()); + Object data = yaml.load(fileReader); + if (data instanceof Map) { + Map propertyMap = (Map) data; + Map logPipelineMap = (Map) propertyMap.get("log-pipeline"); + Map sinkeMap = (Map) logPipelineMap.get("sink"); + Map kafkaConfigMap = (Map) sinkeMap.get("kafka"); + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + String json = mapper.writeValueAsString(kafkaConfigMap); + Reader reader = new StringReader(json); + kafkaSinkConfig = mapper.readValue(reader, KafkaSinkConfig.class); + } + executorService = mock(ExecutorService.class); + when(pluginSetting.getPipelineName()).thenReturn("Kafka-sink"); + event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + when(sinkContext.getTagsTargetKey()).thenReturn("tag"); + kafkaSink = new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactoryMock, mock(ExpressionEvaluator.class), sinkContext); + spySink = spy(kafkaSink); + executorsMockedStatic = mockStatic(Executors.class); + props = new Properties(); + props.put("bootstrap.servers", "127.0.0.1:9093"); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + ReflectionTestUtils.setField(spySink, "executorService", executorService); + + + } + + @AfterEach + public void after() { + executorsMockedStatic.close(); + } + + @Test + public void doOutputTest() { + ReflectionTestUtils.setField(kafkaSinkConfig, "schemaConfig", null); + when(executorService.submit(any(ProducerWorker.class))).thenReturn(futureTask); + final Collection records = Arrays.asList(new Record(event)); + spySink.doOutput(records); + verify(spySink).doOutput(records); + } + + + @Test + public void doOutputExceptionTest() { + final Collection records = Arrays.asList(new Record(event)); + when(executorService.submit(any(ProducerWorker.class))).thenThrow(new RuntimeException()); + assertThrows(RuntimeException.class, () -> spySink.doOutput(records)); + } + + @Test + public void doOutputEmptyRecordsTest() { + final Collection records = Arrays.asList(); + spySink.doOutput(records); + verify(spySink).doOutput(records); + + } + + @Test + public void shutdownTest() { + spySink.shutdown(); + verify(spySink).shutdown(); + } + + @Test + public void shutdownExceptionTest() throws InterruptedException { + final InterruptedException interruptedException = new InterruptedException(); + interruptedException.initCause(new InterruptedException()); + + when(executorService.awaitTermination( + 1000L, TimeUnit.MILLISECONDS)).thenThrow(interruptedException); + spySink.shutdown(); + + } + + + @Test + public void doInitializeTest() { + spySink.doInitialize(); + verify(spySink).doInitialize(); + } + + @Test + public void doInitializeNullPointerExceptionTest() { + when(Executors.newFixedThreadPool(totalWorkers)).thenThrow(NullPointerException.class); + assertThrows(NullPointerException.class, () -> spySink.doInitialize()); + } + + + @Test + public void isReadyTest() { + ReflectionTestUtils.setField(kafkaSink, "sinkInitialized", true); + assertEquals(true, kafkaSink.isReady()); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurerTest.java new file mode 100644 index 0000000000..0656fe96ee --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurerTest.java @@ -0,0 +1,75 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.util; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; +import org.yaml.snakeyaml.Yaml; + +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.util.Map; +import java.util.Properties; + +@ExtendWith(MockitoExtension.class) +public class AuthenticationPropertyConfigurerTest { + + + KafkaSinkConfig kafkaSinkConfig; + + + private KafkaSinkConfig createKafkaSinkConfig(String fileName) throws IOException { + Yaml yaml = new Yaml(); + FileReader fileReader = new FileReader(getClass().getClassLoader().getResource(fileName).getFile()); + Object data = yaml.load(fileReader); + if (data instanceof Map) { + Map propertyMap = (Map) data; + Map logPipelineMap = (Map) propertyMap.get("log-pipeline"); + Map sinkeMap = (Map) logPipelineMap.get("sink"); + Map kafkaConfigMap = (Map) sinkeMap.get("kafka"); + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + String json = mapper.writeValueAsString(kafkaConfigMap); + Reader reader = new StringReader(json); + return mapper.readValue(reader, KafkaSinkConfig.class); + } + return null; + + } + + @Test + public void testSetSaslPlainTextProperties() throws IOException { + Properties props = new Properties(); + kafkaSinkConfig = createKafkaSinkConfig("sample-pipelines-sink.yaml"); + AuthenticationPropertyConfigurer.setSaslPlainTextProperties(kafkaSinkConfig.getAuthConfig().getSaslAuthConfig(). + getPlainTextAuthConfig(), props); + Assertions.assertEquals("PLAIN", props.getProperty("sasl.mechanism")); + } + + @Test + public void testSetSaslOauthProperties() throws IOException { + Properties props = new Properties(); + kafkaSinkConfig = createKafkaSinkConfig("sample-pipelines-sink-oauth.yaml"); + AuthenticationPropertyConfigurer.setOauthProperties(kafkaSinkConfig.getAuthConfig().getSaslAuthConfig() + .getOAuthConfig(), props); + Assertions.assertEquals("OAUTHBEARER", props.getProperty("sasl.mechanism")); + } + + @Test + public void testSetSaslSslProperties() throws IOException { + Properties props = new Properties(); + kafkaSinkConfig = createKafkaSinkConfig("sample-pipelines-sink-ssl.yaml"); + AuthenticationPropertyConfigurer.setSaslPlainProperties(kafkaSinkConfig.getAuthConfig().getSaslAuthConfig().getPlainTextAuthConfig(), props); + Assertions.assertEquals("PLAIN", props.getProperty("sasl.mechanism")); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurerTest.java new file mode 100644 index 0000000000..4319085f97 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurerTest.java @@ -0,0 +1,92 @@ +package org.opensearch.dataprepper.plugins.kafka.util; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; +import org.springframework.test.util.ReflectionTestUtils; +import org.yaml.snakeyaml.Yaml; + +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.util.Map; +import java.util.Properties; + +import static org.mockito.Mockito.mockStatic; + +@ExtendWith(MockitoExtension.class) +public class SinkPropertyConfigurerTest { + + + KafkaSinkConfig kafkaSinkConfig; + + MockedStatic authenticationPropertyConfigurerMockedStatic; + + + @BeforeEach + public void setUp() throws IOException { + + Yaml yaml = new Yaml(); + FileReader fileReader = new FileReader(getClass().getClassLoader().getResource("sample-pipelines-sink.yaml").getFile()); + Object data = yaml.load(fileReader); + if (data instanceof Map) { + Map propertyMap = (Map) data; + Map logPipelineMap = (Map) propertyMap.get("log-pipeline"); + Map sinkeMap = (Map) logPipelineMap.get("sink"); + Map kafkaConfigMap = (Map) sinkeMap.get("kafka"); + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + String json = mapper.writeValueAsString(kafkaConfigMap); + Reader reader = new StringReader(json); + kafkaSinkConfig = mapper.readValue(reader, KafkaSinkConfig.class); + } + + authenticationPropertyConfigurerMockedStatic = mockStatic(AuthenticationPropertyConfigurer.class); + } + + @AfterEach + public void postProcess() { + authenticationPropertyConfigurerMockedStatic.close(); + } + + @Test + public void testGetProducerPropertiesForJson() { + ReflectionTestUtils.setField(kafkaSinkConfig, "schemaConfig", null); + Properties props = SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig); + Assertions.assertEquals("30000", props.getProperty("session.timeout.ms")); + } + + @Test + public void testGetProducerPropertiesForPlainText() { + ReflectionTestUtils.setField(kafkaSinkConfig, "serdeFormat", "plaintext"); + Assertions.assertThrows(RuntimeException.class, () -> SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig)); + + } + + @Test + public void testGetProducerPropertiesForAvro() { + ReflectionTestUtils.setField(kafkaSinkConfig, "serdeFormat", "avro"); + SchemaConfig schemaConfig = kafkaSinkConfig.getSchemaConfig(); + ReflectionTestUtils.setField(kafkaSinkConfig, "schemaConfig", null); + // Assertions.assertThrows(RuntimeException.class, () -> SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig)); + ReflectionTestUtils.setField(kafkaSinkConfig, "schemaConfig", schemaConfig); + Assertions.assertEquals("30000", SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig).getProperty("session.timeout.ms")); + + } + + @Test + public void testGetProducerPropertiesForNoSerde() { + ReflectionTestUtils.setField(kafkaSinkConfig, "serdeFormat", null); + Assertions.assertThrows(RuntimeException.class, () -> SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig)); + } + +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink-oauth.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink-oauth.yaml new file mode 100644 index 0000000000..c436697e96 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink-oauth.yaml @@ -0,0 +1,34 @@ +log-pipeline : + source : + random: + sink : + kafka: + bootstrap_servers: + - "http://localhost:9092" + thread_wait_time: 1000 + dlq: + s3: + bucket: "mydlqtestbucket" + key_path_prefix: "dlq-files/" + sts_role_arn: "arn:aws:iam::045129910014:role/dataprepper" + region: "ap-south-1" + topic: + name: plaintext + authentication: + sasl: + oauth: + oauth_client_id: XXXXXXXXXXXX + oauth_client_secret: XXXXXXXXXXXX + oauth_login_server: https://dev-XXXXXXXXXXXX.okta.com + oauth_login_endpoint: /oauth2/default/v1/token + oauth_login_grant_type: refresh_token + oauth_login_scope: kafka + oauth_introspect_server: https://dev-XXXXXXXXXXXX.okta.com + oauth_introspect_endpoint: /oauth2/default/v1/introspect + oauth_token_endpoint_url: https://dev-XXXXXXXXXXXX.okta.com/oauth2/default/v1/token + oauth_sasl_mechanism: OAUTHBEARER + oauth_security_protocol: SASL_SSL + oauth_sasl_login_callback_handler_class: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler + oauth_jwks_endpoint_url: https://dev-XXXXXXXXXXXX.okta.com/oauth2/default/v1/keys + extension_logicalCluster: lkc-XXXXXXXXXXXX + extension_identityPoolId: pool-XXXXXXXXXXXX \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink-ssl.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink-ssl.yaml new file mode 100644 index 0000000000..7e4be88737 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink-ssl.yaml @@ -0,0 +1,56 @@ +log-pipeline : + source : + random: + sink : + kafka: + bootstrap_servers: + - "http://localhost:9092" + thread_wait_time: 1000 + dlq: + s3: + bucket: "mydlqtestbucket" + key_path_prefix: "dlq-files/" + sts_role_arn: "arn:aws:iam::xxxxx:role/dataprepper" + region: "ap-south-1" + + serde_format: plaintext + topic: + name: ssl_test_topic + producer_properties: + buffer_memory: 10mb + compression_type: gzip + retries: 3 + batch_size: 16384 + client_dns_lookup: use_all_dns_ips + connections_max_idle: PT2M + delivery_timeout: PT2M + linger_ms: 0 + max_block: PT2M + max_request_size: 1048576 + partitioner_class: org.apache.kafka.clients.producer.internals.DefaultPartitioner + partitioner_ignore_keys: false + receive_buffer: 3mb + request_timeout: PT2M + send_buffer: 2mb + socket_connection_setup_timeout_max: PT2M + socket_connection_setup_timeout: PT2M + acks: all + enable_idempotence: true + max_in_flight_requests_per_connection: 5 + metadata_max_age: PT2M + metadata_max_idle: PT2M + metrics_num_samples: 2 + metrics_recording_level: INFO + metrics_sample_window: PT2M + partitioner_adaptive_partitioning_enable: true + partitioner_availability_timeout: PT2M + reconnect_backoff_max: PT2M + reconnect_backoff: PT2M + retry_backoff: PT2M + authentication: + sasl: + ssl_endpoint_identification_algorithm: https + plaintext: + username: 2BRMRAPEMGRJ25AY + password: C6vLQcD0bmTcDBkzvkUMxPFEsJB2XocSg8aJaLOZQT3PN28WrbR8a0+bRRjYyyei + diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink.yaml new file mode 100644 index 0000000000..7e8b540061 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-sink.yaml @@ -0,0 +1,57 @@ +log-pipeline : + source : + random: + sink : + kafka: + bootstrap_servers: + - "localhost:9092" + thread_wait_time: 1000 + dlq: + s3: + bucket: "mydlqtestbucket" + key_path_prefix: "dlq-files/" + sts_role_arn: "arn:aws:iam::XXXXXXXXX:role/dataprepper" + region: "ap-south-1" + serde_format: plaintext + topic: + name: plaintext_test_topic + producer_properties: + buffer_memory: 10mb + compression_type: gzip + retries: 3 + batch_size: 16384 + client_dns_lookup: use_all_dns_ips + connections_max_idle: PT2M + delivery_timeout: PT2M + linger_ms: 0 + max_block: PT2M + max_request_size: 1048576 + partitioner_class: org.apache.kafka.clients.producer.internals.DefaultPartitioner + partitioner_ignore_keys: false + receive_buffer: 3mb + request_timeout: PT2M + send_buffer: 2mb + socket_connection_setup_timeout_max: PT2M + socket_connection_setup_timeout: PT2M + acks: all + enable_idempotence: true + max_in_flight_requests_per_connection: 5 + metadata_max_age: PT2M + metadata_max_idle: PT2M + metrics_num_samples: 2 + metrics_recording_level: INFO + metrics_sample_window: PT2M + partitioner_adaptive_partitioning_enable: true + partitioner_availability_timeout: PT2M + reconnect_backoff_max: PT2M + reconnect_backoff: PT2M + retry_backoff: PT2M + schema: + registry_url: https://localhost:8085 + authentication: + sasl: + plaintext: + username: username + password: password + +