diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java index b43e901019..d3523af5a7 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java @@ -177,7 +177,7 @@ protected int getRecordsInFlight() { * * @param recordsInBuffer the current number of records in the buffer */ - protected void postProcess(final Long recordsInBuffer) { + public void postProcess(final Long recordsInBuffer) { } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderReceiveBuffer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderReceiveBuffer.java index 1b410c5157..bfb98aa8a7 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderReceiveBuffer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderReceiveBuffer.java @@ -137,7 +137,7 @@ private T pollForBufferEntry(final int timeoutValue, final TimeUnit timeoutUnit) } @Override - protected void postProcess(final Long recordsInBuffer) { + public void postProcess(final Long recordsInBuffer) { // adding bounds to address race conditions and reporting negative buffer usage final Double nonNegativeTotalRecords = recordsInBuffer.doubleValue() < 0 ? 0 : recordsInBuffer.doubleValue(); final Double boundedTotalRecords = nonNegativeTotalRecords > bufferSize ? bufferSize : nonNegativeTotalRecords; diff --git a/data-prepper-plugins/blocking-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBuffer.java b/data-prepper-plugins/blocking-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBuffer.java index 397c857704..fb67ccf543 100644 --- a/data-prepper-plugins/blocking-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBuffer.java +++ b/data-prepper-plugins/blocking-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBuffer.java @@ -197,7 +197,7 @@ public static PluginSetting getDefaultPluginSettings() { } @Override - protected void postProcess(final Long recordsInBuffer) { + public void postProcess(final Long recordsInBuffer) { // adding bounds to address race conditions and reporting negative buffer usage final Double nonNegativeTotalRecords = recordsInBuffer.doubleValue() < 0 ? 0 : recordsInBuffer.doubleValue(); final Double boundedTotalRecords = nonNegativeTotalRecords > bufferCapacity ? bufferCapacity : nonNegativeTotalRecords; diff --git a/data-prepper-plugins/kafka-plugins/README-sink.md b/data-prepper-plugins/kafka-plugins/README-sink.md index 9a415d1155..5435b1f811 100644 --- a/data-prepper-plugins/kafka-plugins/README-sink.md +++ b/data-prepper-plugins/kafka-plugins/README-sink.md @@ -274,7 +274,7 @@ bin/zookeeper-server-start.sh config/zookeeper.properties 2. Start Kafka Server with the following configuration Configuration in config/server.properties ``` -isteners=SASL_SSL://localhost:9093,PLAINTEXT://localhost:9092,SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095 +listeners=SASL_SSL://localhost:9093,PLAINTEXT://localhost:9092,SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095 security.inter.broker.protocol=SASL_SSL sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN diff --git a/data-prepper-plugins/kafka-plugins/README.md b/data-prepper-plugins/kafka-plugins/README.md index 31463d45ff..698777350b 100644 --- a/data-prepper-plugins/kafka-plugins/README.md +++ b/data-prepper-plugins/kafka-plugins/README.md @@ -17,7 +17,7 @@ bin/zookeeper-server-start.sh config/zookeeper.properties 2. Start Kafka Server with the following configuration Configuration in config/server.properties ``` -isteners=SASL_SSL://localhost:9093,PLAINTEXT://localhost:9092,SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095 +listeners=SASL_SSL://localhost:9093,PLAINTEXT://localhost:9092,SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095 security.inter.broker.protocol=SASL_SSL sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN diff --git a/data-prepper-plugins/kafka-plugins/build.gradle b/data-prepper-plugins/kafka-plugins/build.gradle index 3436de0133..4a98d071c8 100644 --- a/data-prepper-plugins/kafka-plugins/build.gradle +++ b/data-prepper-plugins/kafka-plugins/build.gradle @@ -10,6 +10,7 @@ plugins { dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:buffer-common') + implementation project(':data-prepper-plugins:blocking-buffer') implementation 'org.apache.kafka:kafka-clients:3.4.0' implementation 'org.apache.avro:avro:1.11.0' implementation 'com.fasterxml.jackson.core:jackson-databind' diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java index 2f0e718cc2..6830168e25 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkAvroTypeIT.java @@ -155,7 +155,7 @@ public void setup() throws RestClientException, IOException { when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers"); - when(kafkaSinkConfig.getBootStrapServers()).thenReturn(Collections.singletonList(bootstrapServers)); + when(kafkaSinkConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers)); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java index 278b63ce1e..30dec4a628 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkJsonTypeIT.java @@ -132,7 +132,7 @@ public void setup() { when(topicConfig.getAutoOffsetReset()).thenReturn("earliest"); when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers"); - when(kafkaSinkConfig.getBootStrapServers()).thenReturn(Collections.singletonList(bootstrapServers)); + when(kafkaSinkConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers)); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); } @@ -253,6 +253,7 @@ private void pollRecords(List> recList, KafkaConsumer topicList; @Mock - private KafkaSourceConfig.EncryptionConfig encryptionConfig; + private EncryptionConfig encryptionConfig; @Mock private TopicConfig jsonTopic; @@ -96,7 +97,7 @@ public void setup() { pluginMetrics = mock(PluginMetrics.class); counter = mock(Counter.class); buffer = mock(Buffer.class); - encryptionConfig = mock(KafkaSourceConfig.EncryptionConfig.class); + encryptionConfig = mock(EncryptionConfig.class); receivedRecords = new ArrayList<>(); ExecutorService executor = Executors.newFixedThreadPool(2); acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor); @@ -128,7 +129,7 @@ public void setup() { when(jsonTopic.getAutoOffsetReset()).thenReturn("earliest"); when(jsonTopic.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers"); - when(sourceConfig.getBootStrapServers()).thenReturn(bootstrapServers); + when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers)); when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig); } diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java index aca9c8dd5c..0556c13cf4 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java @@ -23,6 +23,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; @@ -77,7 +78,7 @@ public class KafkaSourceMultipleAuthTypeIT { private PlainTextAuthConfig plainTextAuthConfig; @Mock - private KafkaSourceConfig.EncryptionConfig encryptionConfig; + private EncryptionConfig encryptionConfig; private TopicConfig jsonTopic; private TopicConfig avroTopic; @@ -105,7 +106,7 @@ public void setup() { pluginMetrics = mock(PluginMetrics.class); counter = mock(Counter.class); buffer = mock(Buffer.class); - encryptionConfig = mock(KafkaSourceConfig.EncryptionConfig.class); + encryptionConfig = mock(EncryptionConfig.class); receivedRecords = new ArrayList<>(); acknowledgementSetManager = mock(AcknowledgementSetManager.class); pipelineDescription = mock(PipelineDescription.class); @@ -140,7 +141,7 @@ public void setup() { sslBootstrapServers = System.getProperty("tests.kafka.ssl_bootstrap_servers"); kafkaUsername = System.getProperty("tests.kafka.username"); kafkaPassword = System.getProperty("tests.kafka.password"); - when(sourceConfig.getBootStrapServers()).thenReturn(bootstrapServers); + when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers)); when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig); } @@ -205,7 +206,7 @@ public void TestPlainTextWithAuthKafkaNoEncryptionWithNoAuthSchemaRegistry() thr when(sourceConfig.getAuthConfig()).thenReturn(authConfig); when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig); when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig); - when(sourceConfig.getBootStrapServers()).thenReturn(saslplainBootstrapServers); + when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList(saslplainBootstrapServers)); kafkaSource = createObjectUnderTest(); Properties props = new Properties(); @@ -256,7 +257,7 @@ public void TestPlainTextWithNoAuthKafkaEncryptionWithNoAuthSchemaRegistry() thr when(encryptionConfig.getInsecure()).thenReturn(true); when(encryptionConfig.getType()).thenReturn(EncryptionType.SSL); when(plainTextTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); - when(sourceConfig.getBootStrapServers()).thenReturn(sslBootstrapServers); + when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList(sslBootstrapServers)); when(sourceConfig.getTopics()).thenReturn(List.of(plainTextTopic)); kafkaSource = createObjectUnderTest(); @@ -312,7 +313,7 @@ public void TestPlainTextWithAuthKafkaEncryptionWithNoAuthSchemaRegistry() throw when(encryptionConfig.getInsecure()).thenReturn(true); when(encryptionConfig.getType()).thenReturn(EncryptionType.SSL); when(plainTextTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); - when(sourceConfig.getBootStrapServers()).thenReturn(saslsslBootstrapServers); + when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList(saslsslBootstrapServers)); when(sourceConfig.getTopics()).thenReturn(List.of(plainTextTopic)); kafkaSource = createObjectUnderTest(); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java index 6bd4202cf6..47fc7f2ca8 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java @@ -33,6 +33,7 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.AwsIamAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.MskBrokerConnectionType; @@ -97,7 +98,7 @@ public class MskGlueRegistryMultiTypeIT { private AwsConfig.AwsMskConfig awsMskConfig; @Mock - private KafkaSourceConfig.EncryptionConfig encryptionConfig; + private EncryptionConfig encryptionConfig; private KafkaSource kafkaSource; private TopicConfig jsonTopic; @@ -180,8 +181,8 @@ public void setup() { testAvroSchemaName = System.getProperty("tests.kafka.glue_avro_schema_name"); testMskArn = System.getProperty("tests.msk.arn"); testMskRegion = System.getProperty("tests.msk.region"); - when(sourceConfig.getBootStrapServers()).thenReturn(bootstrapServers); - encryptionConfig = mock(KafkaSourceConfig.EncryptionConfig.class); + when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers)); + encryptionConfig = mock(EncryptionConfig.class); when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig); System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService"); } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java new file mode 100644 index 0000000000..c793041545 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java @@ -0,0 +1,91 @@ +package org.opensearch.dataprepper.plugins.kafka.buffer; + +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.buffer.AbstractBuffer; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaBufferConfig; +import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer; +import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory; +import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer; +import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +@DataPrepperPlugin(name = "kafka", pluginType = Buffer.class, pluginConfigurationType = KafkaBufferConfig.class) +public class KafkaBuffer> extends AbstractBuffer { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaBuffer.class); + public static final int INNER_BUFFER_CAPACITY = 1000000; + public static final int INNER_BUFFER_BATCH_SIZE = 250000; + private final KafkaCustomProducer producer; + private final AbstractBuffer innerBuffer; + private final ExecutorService executorService; + + @DataPrepperPluginConstructor + public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig kafkaBufferConfig, final PluginFactory pluginFactory, + final AcknowledgementSetManager acknowledgementSetManager, final PluginMetrics pluginMetrics){ + super(pluginSetting); + final KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory(); + producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, pluginFactory, pluginSetting, null, null); + final KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory(); + innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName()); + final List consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(), + innerBuffer, pluginMetrics, acknowledgementSetManager, new AtomicBoolean(false)); + this.executorService = Executors.newFixedThreadPool(consumers.size()); + consumers.forEach(this.executorService::submit); + } + + @Override + public void doWrite(T record, int timeoutInMillis) throws TimeoutException { + try { + producer.produceRecords(record); + } catch (final Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e); + } + } + + @Override + public void doWriteAll(Collection records, int timeoutInMillis) throws Exception { + for ( T record: records ) { + doWrite(record, timeoutInMillis); + } + } + + @Override + public Map.Entry, CheckpointState> doRead(int timeoutInMillis) { + return innerBuffer.doRead(timeoutInMillis); + } + + @Override + public void postProcess(final Long recordsInBuffer) { + innerBuffer.postProcess(recordsInBuffer); + } + + @Override + public void doCheckpoint(CheckpointState checkpointState) { + innerBuffer.doCheckpoint(checkpointState); + } + + @Override + public boolean isEmpty() { + // TODO: check Kafka topic is empty as well. + return innerBuffer.isEmpty(); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsConfig.java index 972d737d12..af055ab89a 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsConfig.java @@ -42,6 +42,9 @@ public MskBrokerConnectionType getBrokerConnectionType() { @JsonProperty("sts_role_arn") private String stsRoleArn; + @JsonProperty("role_session_name") + private String stsRoleSessionName; + public AwsMskConfig getAwsMskConfig() { return awsMskConfig; } @@ -53,4 +56,8 @@ public String getRegion() { public String getStsRoleArn() { return stsRoleArn; } + + public String getStsRoleSessionName() { + return stsRoleSessionName; + } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/EncryptionConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/EncryptionConfig.java new file mode 100644 index 0000000000..a6642e5afe --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/EncryptionConfig.java @@ -0,0 +1,19 @@ +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class EncryptionConfig { + @JsonProperty("type") + private EncryptionType type = EncryptionType.SSL; + + @JsonProperty("insecure") + private boolean insecure = false; + + public EncryptionType getType() { + return type; + } + + public boolean getInsecure() { + return insecure; + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java new file mode 100644 index 0000000000..5dd1f68b42 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java @@ -0,0 +1,116 @@ +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +public class KafkaBufferConfig implements KafkaProducerConfig, KafkaConsumerConfig { + + @JsonProperty("bootstrap_servers") + private List bootStrapServers; + + @JsonProperty("topics") + @NotNull + @Size(min = 1, max = 1, message = "Only one topic currently supported for Kafka buffer") + private List topics; + + @JsonProperty("schema") + @Valid + private SchemaConfig schemaConfig; + + @Valid + @JsonProperty("authentication") + private AuthConfig authConfig; + + @JsonProperty("encryption") + private EncryptionConfig encryptionConfig; + + @JsonProperty("producer_properties") + private KafkaProducerProperties kafkaProducerProperties; + + @JsonProperty("aws") + @Valid + private AwsConfig awsConfig; + + + public List getBootstrapServers() { + if (Objects.nonNull(bootStrapServers)) { + return bootStrapServers; + } + return null; + } + + @Override + public AuthConfig getAuthConfig() { + return authConfig; + } + + @Override + public SchemaConfig getSchemaConfig() { + return schemaConfig; + } + + @Override + public String getSerdeFormat() { + return getTopic().getSerdeFormat().toString(); + } + + @Override + public TopicConfig getTopic() { + return topics.get(0); + } + + @Override + public List getTopics() { + return topics; + } + + @Override + public KafkaProducerProperties getKafkaProducerProperties() { + return kafkaProducerProperties; + } + + @Override + public String getPartitionKey() { + return "pipeline-buffer"; + } + + @Override + public Optional getDlq() { + // TODO: move DLQ logic to be sink specific (currently, write to DLQ is handled by KafkaCustomConsumer) + return Optional.empty(); + } + @Override + public void setDlqConfig(PluginSetting pluginSetting) { + + } + @Override + public AwsConfig getAwsConfig() { + return awsConfig; + } + + @Override + public EncryptionConfig getEncryptionConfig() { + if (Objects.isNull(encryptionConfig)) { + return new EncryptionConfig(); + } + return encryptionConfig; + } + + @Override + public String getClientDnsLookup() { + return null; + } + + @Override + public boolean getAcknowledgementsEnabled() { + return false; + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaConnectionConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaConnectionConfig.java new file mode 100644 index 0000000000..09d6cf3376 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaConnectionConfig.java @@ -0,0 +1,11 @@ +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import java.util.Collection; + +public interface KafkaConnectionConfig { + Collection getBootstrapServers(); + AuthConfig getAuthConfig(); + AwsConfig getAwsConfig(); + EncryptionConfig getEncryptionConfig(); + SchemaConfig getSchemaConfig(); +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaConsumerConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaConsumerConfig.java new file mode 100644 index 0000000000..27b16feb53 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaConsumerConfig.java @@ -0,0 +1,14 @@ +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import java.util.List; + +public interface KafkaConsumerConfig extends KafkaConnectionConfig { + + String getClientDnsLookup(); + + boolean getAcknowledgementsEnabled(); + + SchemaConfig getSchemaConfig(); + + List getTopics(); +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaProducerConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaProducerConfig.java new file mode 100644 index 0000000000..b08f97aca2 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaProducerConfig.java @@ -0,0 +1,26 @@ +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; + +import java.util.Collection; +import java.util.Optional; + +public interface KafkaProducerConfig extends KafkaConnectionConfig { + Collection getBootstrapServers(); + + AuthConfig getAuthConfig(); + + SchemaConfig getSchemaConfig(); + + String getSerdeFormat(); + + TopicConfig getTopic(); + + KafkaProducerProperties getKafkaProducerProperties(); + + void setDlqConfig(PluginSetting pluginSetting); + + String getPartitionKey(); + Optional getDlq(); +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java index 0e35750657..b4eb00c2a1 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java @@ -17,6 +17,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; /** @@ -24,7 +25,7 @@ * pipelines.yaml */ -public class KafkaSinkConfig { +public class KafkaSinkConfig implements KafkaProducerConfig{ public static final String DLQ = "dlq"; @@ -72,7 +73,12 @@ public void setDlqConfig(final PluginSetting pluginSetting) { @JsonProperty("serde_format") private String serdeFormat; + @JsonProperty("encryption") + private EncryptionConfig encryptionConfig; + @JsonProperty("aws") + @Valid + private AwsConfig awsConfig; @JsonProperty("partition_key") @NotNull @NotEmpty @@ -90,8 +96,20 @@ public AuthConfig getAuthConfig() { return authConfig; } + @Override + public EncryptionConfig getEncryptionConfig() { + if (Objects.isNull(encryptionConfig)) { + return new EncryptionConfig(); + } + return encryptionConfig; + } + @Override + public AwsConfig getAwsConfig() { + return awsConfig; + } + - public List getBootStrapServers() { + public List getBootstrapServers() { return bootStrapServers; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java index 38f11aefe2..060878fb17 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java @@ -18,22 +18,7 @@ * pipelines.yaml */ -public class KafkaSourceConfig { - public class EncryptionConfig { - @JsonProperty("type") - private EncryptionType type = EncryptionType.SSL; - - @JsonProperty("insecure") - private boolean insecure = false; - - public EncryptionType getType() { - return type; - } - - public boolean getInsecure() { - return insecure; - } - } +public class KafkaSourceConfig implements KafkaConsumerConfig { @JsonProperty("bootstrap_servers") private List bootStrapServers; @@ -68,7 +53,7 @@ public String getClientDnsLookup() { return clientDnsLookup; } - public Boolean getAcknowledgementsEnabled() { + public boolean getAcknowledgementsEnabled() { return acknowledgementsEnabled; } @@ -80,9 +65,9 @@ public void setTopics(List topics) { this.topics = topics; } - public String getBootStrapServers() { + public List getBootstrapServers() { if (Objects.nonNull(bootStrapServers)) { - return String.join(",", bootStrapServers); + return bootStrapServers; } return null; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java similarity index 96% rename from data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java rename to data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java index 2604f4404d..bc80e7964b 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java @@ -28,8 +28,8 @@ import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; import org.opensearch.dataprepper.plugins.kafka.util.LogRateLimiter; @@ -58,9 +58,9 @@ /** * * A utility class which will handle the core Kafka consumer operation. */ -public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceListener { +public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener { - private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceCustomConsumer.class); + private static final Logger LOG = LoggerFactory.getLogger(KafkaCustomConsumer.class); private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L; private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1; static final String DEFAULT_KEY = "message"; @@ -89,14 +89,14 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis private long numRecordsCommitted = 0; private final LogRateLimiter errLogRateLimiter; - public KafkaSourceCustomConsumer(final KafkaConsumer consumer, - final AtomicBoolean shutdownInProgress, - final Buffer> buffer, - final KafkaSourceConfig sourceConfig, - final TopicConfig topicConfig, - final String schemaType, - final AcknowledgementSetManager acknowledgementSetManager, - KafkaTopicMetrics topicMetrics) { + public KafkaCustomConsumer(final KafkaConsumer consumer, + final AtomicBoolean shutdownInProgress, + final Buffer> buffer, + final KafkaConsumerConfig consumerConfig, + final TopicConfig topicConfig, + final String schemaType, + final AcknowledgementSetManager acknowledgementSetManager, + KafkaTopicMetrics topicMetrics) { this.topicName = topicConfig.getName(); this.topicConfig = topicConfig; this.shutdownInProgress = shutdownInProgress; @@ -109,7 +109,7 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, this.metricsUpdatedTime = Instant.now().getEpochSecond(); this.acknowledgedOffsets = new ArrayList<>(); this.acknowledgementsTimeout = Duration.ofSeconds(Integer.MAX_VALUE); - this.acknowledgementsEnabled = sourceConfig.getAcknowledgementsEnabled(); + this.acknowledgementsEnabled = consumerConfig.getAcknowledgementsEnabled(); this.acknowledgementSetManager = acknowledgementSetManager; this.partitionCommitTrackerMap = new HashMap<>(); this.partitionsToReset = Collections.synchronizedSet(new HashSet<>()); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java new file mode 100644 index 0000000000..0e544e7f02 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java @@ -0,0 +1,247 @@ +package org.opensearch.dataprepper.plugins.kafka.consumer; + +import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import io.confluent.kafka.serializers.KafkaJsonDeserializer; +import kafka.common.BrokerEndPointNotAvailableException; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.BrokerNotAvailableException; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType; +import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceJsonDeserializer; +import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer; +import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; +import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; + +public class KafkaCustomConsumerFactory { + private static final Logger LOG = LoggerFactory.getLogger(KafkaCustomConsumerFactory.class); + + private final StringDeserializer stringDeserializer = new StringDeserializer(); + private String schemaType = MessageFormat.PLAINTEXT.toString(); + + public List createConsumersForTopic(final KafkaConsumerConfig kafkaConsumerConfig, final TopicConfig topic, + final Buffer> buffer, final PluginMetrics pluginMetrics, + final AcknowledgementSetManager acknowledgementSetManager, + final AtomicBoolean shutdownInProgress) { + Properties authProperties = new Properties(); + KafkaSecurityConfigurer.setAuthProperties(authProperties, kafkaConsumerConfig, LOG); + KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics); + Properties consumerProperties = getConsumerProperties(kafkaConsumerConfig, topic, authProperties); + MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType); + + final List consumers = new ArrayList<>(); + + try { + final int numWorkers = topic.getWorkers(); + IntStream.range(0, numWorkers).forEach(index -> { + final KafkaConsumer kafkaConsumer; + switch (schema) { + case JSON: + kafkaConsumer = new KafkaConsumer(consumerProperties); + break; + case AVRO: + kafkaConsumer = new KafkaConsumer(consumerProperties); + break; + case BYTES: + kafkaConsumer = new KafkaConsumer(consumerProperties); + break; + case PLAINTEXT: + default: + final GlueSchemaRegistryKafkaDeserializer glueDeserializer = KafkaSecurityConfigurer.getGlueSerializer(kafkaConsumerConfig); + if (Objects.nonNull(glueDeserializer)) { + kafkaConsumer = new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer); + } else { + kafkaConsumer = new KafkaConsumer(consumerProperties); + } + break; + } + consumers.add(new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, kafkaConsumerConfig, topic, + schemaType, acknowledgementSetManager, topicMetrics)); + + }); + } catch (Exception e) { + if (e instanceof BrokerNotAvailableException || + e instanceof BrokerEndPointNotAvailableException || e instanceof TimeoutException) { + LOG.error("The Kafka broker is not available."); + } else { + LOG.error("Failed to setup the Kafka Source Plugin.", e); + } + throw new RuntimeException(e); + } + + return consumers; + } + + private Properties getConsumerProperties(final KafkaConsumerConfig sourceConfig, final TopicConfig topicConfig, final Properties authProperties) { + Properties properties = (Properties)authProperties.clone(); + if (StringUtils.isNotEmpty(sourceConfig.getClientDnsLookup())) { + ClientDNSLookupType dnsLookupType = ClientDNSLookupType.getDnsLookupType(sourceConfig.getClientDnsLookup()); + switch (dnsLookupType) { + case USE_ALL_DNS_IPS: + properties.put("client.dns.lookup", ClientDNSLookupType.USE_ALL_DNS_IPS.toString()); + break; + case CANONICAL_BOOTSTRAP: + properties.put("client.dns.lookup", ClientDNSLookupType.CANONICAL_BOOTSTRAP.toString()); + break; + case DEFAULT: + properties.put("client.dns.lookup", ClientDNSLookupType.DEFAULT.toString()); + break; + } + } + setConsumerTopicProperties(properties, topicConfig); + setSchemaRegistryProperties(sourceConfig, properties, topicConfig); + LOG.debug("Starting consumer with the properties : {}", properties); + return properties; + } + + private void setConsumerTopicProperties(final Properties properties, final TopicConfig topicConfig) { + properties.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId()); + properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int)topicConfig.getMaxPartitionFetchBytes()); + properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long)topicConfig.getRetryBackoff().toMillis()).intValue()); + properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long)topicConfig.getReconnectBackoff().toMillis()).intValue()); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + topicConfig.getAutoCommit()); + properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, + ((Long)topicConfig.getCommitInterval().toMillis()).intValue()); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + topicConfig.getAutoOffsetReset()); + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + topicConfig.getConsumerMaxPollRecords()); + properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, + ((Long)topicConfig.getMaxPollInterval().toMillis()).intValue()); + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long)topicConfig.getSessionTimeOut().toMillis()).intValue()); + properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long)topicConfig.getHeartBeatInterval().toMillis()).intValue()); + properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, (int)topicConfig.getFetchMaxBytes()); + properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait()); + properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int)topicConfig.getFetchMinBytes()); + } + + private void setSchemaRegistryProperties(final KafkaConsumerConfig kafkaConsumerConfig, final Properties properties, final TopicConfig topicConfig) { + SchemaConfig schemaConfig = kafkaConsumerConfig.getSchemaConfig(); + if (Objects.isNull(schemaConfig)) { + setPropertiesForPlaintextAndJsonWithoutSchemaRegistry(properties, topicConfig); + return; + } + + if (schemaConfig.getType() == SchemaRegistryType.AWS_GLUE) { + return; + } else if (schemaConfig.getType() == SchemaRegistryType.CONFLUENT) { + setupConfluentSchemaRegistry(schemaConfig, kafkaConsumerConfig, properties, topicConfig); + } + + } + + private void setPropertiesForPlaintextAndJsonWithoutSchemaRegistry(Properties properties, final TopicConfig topicConfig) { + MessageFormat dataFormat = topicConfig.getSerdeFormat(); + schemaType = dataFormat.toString(); + LOG.error("Setting schemaType to {}", schemaType); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + switch (dataFormat) { + case JSON: + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSourceJsonDeserializer.class); + break; + case BYTES: + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + break; + default: + case PLAINTEXT: + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + break; + } + } + + private void setPropertiesForSchemaRegistryConnectivity(final KafkaConsumerConfig kafkaConsumerConfig, final Properties properties) { + AuthConfig authConfig = kafkaConsumerConfig.getAuthConfig(); + String schemaRegistryApiKey = kafkaConsumerConfig.getSchemaConfig().getSchemaRegistryApiKey(); + String schemaRegistryApiSecret = kafkaConsumerConfig.getSchemaConfig().getSchemaRegistryApiSecret(); + //with plaintext authentication for schema registry + if ("USER_INFO".equalsIgnoreCase(kafkaConsumerConfig.getSchemaConfig().getBasicAuthCredentialsSource()) + && authConfig.getSaslAuthConfig().getPlainTextAuthConfig() != null) { + String schemaBasicAuthUserInfo = schemaRegistryApiKey.concat(":").concat(schemaRegistryApiSecret); + properties.put("schema.registry.basic.auth.user.info", schemaBasicAuthUserInfo); + properties.put("basic.auth.credentials.source", "USER_INFO"); + } + + if (authConfig != null && authConfig.getSaslAuthConfig() != null) { + PlainTextAuthConfig plainTextAuthConfig = authConfig.getSaslAuthConfig().getPlainTextAuthConfig(); + OAuthConfig oAuthConfig = authConfig.getSaslAuthConfig().getOAuthConfig(); + if (oAuthConfig != null) { + properties.put("sasl.mechanism", oAuthConfig.getOauthSaslMechanism()); + properties.put("security.protocol", oAuthConfig.getOauthSecurityProtocol()); + } + } + } + + private void setPropertiesForSchemaType(final KafkaConsumerConfig kafkaConsumerConfig, final Properties properties, final TopicConfig topic) { + Map prop = properties; + Map propertyMap = (Map) prop; + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl(kafkaConsumerConfig)); + properties.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS, false); + final CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(properties.getProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG), + 100, propertyMap); + try { + schemaType = schemaRegistryClient.getSchemaMetadata(topic.getName() + "-value", + kafkaConsumerConfig.getSchemaConfig().getVersion()).getSchemaType(); + } catch (IOException | RestClientException e) { + LOG.error("Failed to connect to the schema registry..."); + throw new RuntimeException(e); + } + if (schemaType.equalsIgnoreCase(MessageFormat.JSON.toString())) { + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class); + } else if (schemaType.equalsIgnoreCase(MessageFormat.AVRO.toString())) { + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); + } else { + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + } + } + + private String getSchemaRegistryUrl(final KafkaConsumerConfig kafkaConsumerConfig) { + return kafkaConsumerConfig.getSchemaConfig().getRegistryURL(); + } + + private void setupConfluentSchemaRegistry(final SchemaConfig schemaConfig, final KafkaConsumerConfig kafkaConsumerConfig, + final Properties properties, final TopicConfig topicConfig) { + if (StringUtils.isNotEmpty(schemaConfig.getRegistryURL())) { + setPropertiesForSchemaRegistryConnectivity(kafkaConsumerConfig, properties); + setPropertiesForSchemaType(kafkaConsumerConfig, properties, topicConfig); + } else { + throw new RuntimeException("RegistryURL must be specified for confluent schema registry"); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTracker.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTracker.java index 0cfb7b63bd..dfc8a7355a 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTracker.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicPartitionCommitTracker.java @@ -15,7 +15,7 @@ import java.util.Map; public class TopicPartitionCommitTracker { - private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceCustomConsumer.class); + private static final Logger LOG = LoggerFactory.getLogger(TopicPartitionCommitTracker.class); private long committedOffset; private long committedRecordCount; private long initialOffset; diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java similarity index 82% rename from data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java rename to data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java index 8b4c63b96f..edd94906c1 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java @@ -25,7 +25,7 @@ import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; import org.opensearch.dataprepper.plugins.kafka.service.SchemaService; import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; @@ -36,6 +36,8 @@ import java.util.Collection; import java.util.LinkedList; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Future; /** @@ -43,13 +45,13 @@ * and produce it to a given kafka topic */ -public class KafkaSinkProducer { +public class KafkaCustomProducer { - private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkProducer.class); + private static final Logger LOG = LoggerFactory.getLogger(KafkaCustomProducer.class); private final Producer producer; - private final KafkaSinkConfig kafkaSinkConfig; + private final KafkaProducerConfig kafkaProducerConfig; private final DLQSink dlqSink; @@ -68,21 +70,21 @@ public class KafkaSinkProducer { private final SchemaService schemaService; - public KafkaSinkProducer(final Producer producer, - final KafkaSinkConfig kafkaSinkConfig, - final DLQSink dlqSink, - final ExpressionEvaluator expressionEvaluator, - final String tagTargetKey + public KafkaCustomProducer(final Producer producer, + final KafkaProducerConfig kafkaProducerConfig, + final DLQSink dlqSink, + final ExpressionEvaluator expressionEvaluator, + final String tagTargetKey ) { this.producer = producer; - this.kafkaSinkConfig = kafkaSinkConfig; + this.kafkaProducerConfig = kafkaProducerConfig; this.dlqSink = dlqSink; bufferedEventHandles = new LinkedList<>(); this.expressionEvaluator = expressionEvaluator; this.tagTargetKey = tagTargetKey; - this.topicName = ObjectUtils.isEmpty(kafkaSinkConfig.getTopic()) ? null : kafkaSinkConfig.getTopic().getName(); - this.serdeFormat = ObjectUtils.isEmpty(kafkaSinkConfig.getSerdeFormat()) ? null : kafkaSinkConfig.getSerdeFormat(); - schemaService = new SchemaService.SchemaServiceBuilder().getFetchSchemaService(topicName, kafkaSinkConfig.getSchemaConfig()).build(); + this.topicName = ObjectUtils.isEmpty(kafkaProducerConfig.getTopic()) ? null : kafkaProducerConfig.getTopic().getName(); + this.serdeFormat = ObjectUtils.isEmpty(kafkaProducerConfig.getSerdeFormat()) ? null : kafkaProducerConfig.getSerdeFormat(); + schemaService = new SchemaService.SchemaServiceBuilder().getFetchSchemaService(topicName, kafkaProducerConfig.getSchemaConfig()).build(); } @@ -92,7 +94,7 @@ public void produceRecords(final Record record) { bufferedEventHandles.add(record.getData().getEventHandle()); } Event event = getEvent(record); - final String key = event.formatString(kafkaSinkConfig.getPartitionKey(), expressionEvaluator); + final String key = event.formatString(kafkaProducerConfig.getPartitionKey(), expressionEvaluator); try { if (serdeFormat == MessageFormat.JSON.toString()) { publishJsonMessage(record, key); @@ -102,7 +104,7 @@ public void produceRecords(final Record record) { publishPlaintextMessage(record, key); } } catch (Exception e) { - LOG.error("Error occured while publishing " + e.getMessage()); + LOG.error("Error occurred while publishing " + e.getMessage()); releaseEventHandles(false); } @@ -113,7 +115,7 @@ private Event getEvent(final Record record) { try { event = addTagsToEvent(event, tagTargetKey); } catch (JsonProcessingException e) { - LOG.error("error occured while processing tag target key"); + LOG.error("error occurred while processing tag target key"); } return event; } @@ -132,8 +134,12 @@ private void publishAvroMessage(final Record record, final String key) { send(topicName, key, genericRecord); } - private void send(final String topicName, final String key, final Object record) { - producer.send(new ProducerRecord(topicName, key, record), callBack(record)); + private Future send(final String topicName, String key, final Object record) { + if (Objects.isNull(key)) { + return producer.send(new ProducerRecord(topicName, record), callBack(record)); + } + + return producer.send(new ProducerRecord(topicName, key, record), callBack(record)); } private void publishJsonMessage(final Record record, final String key) throws IOException, ProcessingException { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java new file mode 100644 index 0000000000..bd47bedbcc --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java @@ -0,0 +1,63 @@ +package org.opensearch.dataprepper.plugins.kafka.producer; + +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory; +import org.opensearch.dataprepper.plugins.kafka.service.SchemaService; +import org.opensearch.dataprepper.plugins.kafka.service.TopicService; +import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink; +import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer; +import org.opensearch.dataprepper.plugins.kafka.util.RestUtils; +import org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; +import java.util.Properties; + +public class KafkaCustomProducerFactory { + private static final Logger LOG = LoggerFactory.getLogger(KafkaCustomConsumerFactory.class); + + public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProducerConfig, final PluginFactory pluginFactory, final PluginSetting pluginSetting, + final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext) { + prepareTopicAndSchema(kafkaProducerConfig); + Properties properties = SinkPropertyConfigurer.getProducerProperties(kafkaProducerConfig); + KafkaSecurityConfigurer.setAuthProperties(properties, kafkaProducerConfig, LOG); + properties = Objects.requireNonNull(properties); + return new KafkaCustomProducer(new org.apache.kafka.clients.producer.KafkaProducer<>(properties), + kafkaProducerConfig, new DLQSink(pluginFactory, kafkaProducerConfig, pluginSetting), + expressionEvaluator, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null); + } + private void prepareTopicAndSchema(final KafkaProducerConfig kafkaProducerConfig) { + checkTopicCreationCriteriaAndCreateTopic(kafkaProducerConfig); + final SchemaConfig schemaConfig = kafkaProducerConfig.getSchemaConfig(); + if (schemaConfig != null) { + if (schemaConfig.isCreate()) { + final RestUtils restUtils = new RestUtils(schemaConfig); + final String topic = kafkaProducerConfig.getTopic().getName(); + final SchemaService schemaService = new SchemaService.SchemaServiceBuilder() + .getRegisterationAndCompatibilityService(topic, kafkaProducerConfig.getSerdeFormat(), + restUtils, schemaConfig).build(); + schemaService.registerSchema(topic); + } + + } + + } + + private void checkTopicCreationCriteriaAndCreateTopic(final KafkaProducerConfig kafkaProducerConfig) { + final TopicConfig topic = kafkaProducerConfig.getTopic(); + if (!topic.isCreate()) { + final TopicService topicService = new TopicService(kafkaProducerConfig); + topicService.createTopic(kafkaProducerConfig.getTopic().getName(), topic.getNumberOfPartions(), topic.getReplicationFactor()); + topicService.closeAdminClient(); + } + + + } +} \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorker.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorker.java index dbb05e7401..e8764f00f9 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorker.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorker.java @@ -16,10 +16,10 @@ public class ProducerWorker implements Runnable { private final Record record; - private final KafkaSinkProducer producer; + private final KafkaCustomProducer producer; - public ProducerWorker(final KafkaSinkProducer producer, + public ProducerWorker(final KafkaCustomProducer producer, final Record record) { this.record = record; this.producer = producer; diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/service/TopicService.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/service/TopicService.java index e8cb5d4e06..12d5142364 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/service/TopicService.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/service/TopicService.java @@ -6,7 +6,7 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; import org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,8 +18,8 @@ public class TopicService { private final AdminClient adminClient; - public TopicService(final KafkaSinkConfig sinkConfig) { - this.adminClient = AdminClient.create(SinkPropertyConfigurer.getPropertiesForAdmintClient(sinkConfig)); + public TopicService(final KafkaProducerConfig kafkaProducerConfig) { + this.adminClient = AdminClient.create(SinkPropertyConfigurer.getPropertiesForAdminClient(kafkaProducerConfig)); } public void createTopic(final String topicName, final Integer numberOfPartitions, final Short replicationFactor) { @@ -29,7 +29,7 @@ public void createTopic(final String topicName, final Integer numberOfPartitions LOG.info(topicName + " created successfully"); } catch (Exception e) { - LOG.info(topicName + " Topic already created so using the existing one"); + LOG.error("Caught exception creating topic with name: {}", topicName, e); } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java index 7a82a50253..bea5046365 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java @@ -13,7 +13,7 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.plugins.dlq.DlqProvider; import org.opensearch.dataprepper.plugins.dlq.DlqWriter; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,9 +37,9 @@ public class DLQSink { private final DlqProvider dlqProvider; private final PluginSetting pluginSetting; - public DLQSink(final PluginFactory pluginFactory, final KafkaSinkConfig kafkaSinkConfig, final PluginSetting pluginSetting) { + public DLQSink(final PluginFactory pluginFactory, final KafkaProducerConfig kafkaProducerConfig, final PluginSetting pluginSetting) { this.pluginSetting = pluginSetting; - this.dlqProvider = getDlqProvider(pluginFactory, kafkaSinkConfig); + this.dlqProvider = getDlqProvider(pluginFactory, kafkaProducerConfig); } public void perform(final Object failedData, final Exception e) { @@ -61,9 +61,9 @@ private DlqWriter getDlqWriter() { return dlqWriter; } - private DlqProvider getDlqProvider(final PluginFactory pluginFactory, final KafkaSinkConfig kafkaSinkConfig) { - kafkaSinkConfig.setDlqConfig(pluginSetting); - final Optional dlq = kafkaSinkConfig.getDlq(); + private DlqProvider getDlqProvider(final PluginFactory pluginFactory, final KafkaProducerConfig kafkaProducerConfig) { + kafkaProducerConfig.setDlqConfig(pluginSetting); + final Optional dlq = kafkaProducerConfig.getDlq(); if (dlq.isPresent()) { final PluginModel dlqPluginModel = dlq.get(); final PluginSetting dlqPluginSetting = new PluginSetting(dlqPluginModel.getPluginName(), dlqPluginModel.getPluginSettings()); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java index ccac810171..eb1e7a3a8b 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java @@ -20,7 +20,7 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import org.opensearch.dataprepper.plugins.kafka.producer.KafkaSinkProducer; +import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer; import org.opensearch.dataprepper.plugins.kafka.producer.ProducerWorker; import org.opensearch.dataprepper.plugins.kafka.service.SchemaService; import org.opensearch.dataprepper.plugins.kafka.service.TopicService; @@ -111,7 +111,7 @@ public void doOutput(Collection> records) { } try { prepareTopicAndSchema(); - final KafkaSinkProducer producer = createProducer(); + final KafkaCustomProducer producer = createProducer(); records.forEach(record -> { producerWorker = new ProducerWorker(producer, record); executorService.submit(producerWorker); @@ -152,10 +152,10 @@ private void checkTopicCreationCriteriaAndCreateTopic() { } - public KafkaSinkProducer createProducer() { + public KafkaCustomProducer createProducer() { Properties properties = SinkPropertyConfigurer.getProducerProperties(kafkaSinkConfig); properties = Objects.requireNonNull(properties); - return new KafkaSinkProducer(new KafkaProducer<>(properties), + return new KafkaCustomProducer(new KafkaProducer<>(properties), kafkaSinkConfig, new DLQSink(pluginFactory, kafkaSinkConfig, pluginSetting), expressionEvaluator, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null); } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 53994dc126..a74c1c4737 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -41,10 +41,10 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaSourceCustomConsumer; +import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer; import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceJsonDeserializer; -import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceSecurityConfigurer; +import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer; import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import org.slf4j.Logger; @@ -90,7 +90,7 @@ public class KafkaSource implements Source> { private AtomicBoolean shutdownInProgress; private ExecutorService executorService; private final PluginMetrics pluginMetrics; - private KafkaSourceCustomConsumer consumer; + private KafkaCustomConsumer consumer; private KafkaConsumer kafkaConsumer; private String pipelineName; private String consumerGroupID; @@ -101,7 +101,7 @@ public class KafkaSource implements Source> { private GlueSchemaRegistryKafkaDeserializer glueDeserializer; private StringDeserializer stringDeserializer; private final List allTopicExecutorServices; - private final List allTopicConsumers; + private final List allTopicConsumers; @DataPrepperPluginConstructor public KafkaSource(final KafkaSourceConfig sourceConfig, @@ -121,7 +121,7 @@ public KafkaSource(final KafkaSourceConfig sourceConfig, @Override public void start(Buffer> buffer) { Properties authProperties = new Properties(); - KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); + KafkaSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); sourceConfig.getTopics().forEach(topic -> { consumerGroupID = topic.getGroupId(); KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics); @@ -153,7 +153,7 @@ public void start(Buffer> buffer) { } } - consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, topicMetrics); + consumer = new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, topicMetrics); allTopicConsumers.add(consumer); executorService.submit(consumer); @@ -179,7 +179,7 @@ public void start(Buffer> buffer) { return new KafkaConsumer(consumerProperties); case PLAINTEXT: default: - glueDeserializer = KafkaSourceSecurityConfigurer.getGlueSerializer(sourceConfig); + glueDeserializer = KafkaSecurityConfigurer.getGlueSerializer(sourceConfig); if (Objects.nonNull(glueDeserializer)) { return new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer); } else { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurer.java index b824634ac9..a0241080ee 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/AuthenticationPropertyConfigurer.java @@ -11,7 +11,7 @@ import java.util.Properties; /** - * * This is static property configurer dedicated to authencation related information given in pipeline.yml + * * This is static property configurer dedicated to authentication related information given in pipeline.yml */ public class AuthenticationPropertyConfigurer { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java similarity index 88% rename from data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java rename to data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java index 77fcd6e2fc..4f338367bd 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java @@ -7,7 +7,9 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.AwsIamAuthConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConnectionConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; @@ -30,7 +32,6 @@ import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; import com.amazonaws.services.schemaregistry.utils.AvroRecordType; -import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; import software.amazon.awssdk.services.glue.model.Compatibility; import org.slf4j.Logger; @@ -46,7 +47,7 @@ * * This is static property configure dedicated to authentication related information given in pipeline.yml */ -public class KafkaSourceSecurityConfigurer { +public class KafkaSecurityConfigurer { private static final String SASL_MECHANISM = "sasl.mechanism"; @@ -119,9 +120,9 @@ private static void setPlainTextAuthProperties(Properties properties, final Plai } } - public static void setOauthProperties(final KafkaSourceConfig kafkaSourConfig, + public static void setOauthProperties(final KafkaConnectionConfig kafkaConsumerConfig, final Properties properties) { - final OAuthConfig oAuthConfig = kafkaSourConfig.getAuthConfig().getSaslAuthConfig().getOAuthConfig(); + final OAuthConfig oAuthConfig = kafkaConsumerConfig.getAuthConfig().getSaslAuthConfig().getOAuthConfig(); final String oauthClientId = oAuthConfig.getOauthClientId(); final String oauthClientSecret = oAuthConfig.getOauthClientSecret(); final String oauthLoginServer = oAuthConfig.getOauthLoginServer(); @@ -156,9 +157,9 @@ public static void setOauthProperties(final KafkaSourceConfig kafkaSourConfig, String jass_config = String.format(OAUTH_JAASCONFIG, oauthClientId, oauthClientSecret, oauthLoginScope, oauthLoginServer, oauthLoginEndpoint, oauthLoginGrantType, oauthLoginScope, oauthAuthorizationToken, instrospect_properties); - if ("USER_INFO".equalsIgnoreCase(kafkaSourConfig.getSchemaConfig().getBasicAuthCredentialsSource())) { - final String apiKey = kafkaSourConfig.getSchemaConfig().getSchemaRegistryApiKey(); - final String apiSecret = kafkaSourConfig.getSchemaConfig().getSchemaRegistryApiSecret(); + if ("USER_INFO".equalsIgnoreCase(kafkaConsumerConfig.getSchemaConfig().getBasicAuthCredentialsSource())) { + final String apiKey = kafkaConsumerConfig.getSchemaConfig().getSchemaRegistryApiKey(); + final String apiSecret = kafkaConsumerConfig.getSchemaConfig().getSchemaRegistryApiSecret(); final String extensionLogicalCluster = oAuthConfig.getExtensionLogicalCluster(); final String extensionIdentityPoolId = oAuthConfig.getExtensionIdentityPoolId(); properties.put(REGISTRY_BASIC_AUTH_USER_INFO, apiKey + ":" + apiSecret); @@ -175,10 +176,18 @@ public static void setAwsIamAuthProperties(Properties properties, final AwsIamAu properties.put(SASL_MECHANISM, "AWS_MSK_IAM"); properties.put(SASL_CLIENT_CALLBACK_HANDLER_CLASS, "software.amazon.msk.auth.iam.IAMClientCallbackHandler"); if (awsIamAuthConfig == AwsIamAuthConfig.ROLE) { - properties.put(SASL_JAAS_CONFIG, - "software.amazon.msk.auth.iam.IAMLoginModule required " + - "awsRoleArn=\"" + awsConfig.getStsRoleArn() + - "\" awsStsRegion=\"" + awsConfig.getRegion() + "\";"); + String baseIamAuthConfig = "software.amazon.msk.auth.iam.IAMLoginModule required " + + "awsRoleArn=\"%s\" " + + "awsStsRegion=\"%s\""; + + baseIamAuthConfig = String.format(baseIamAuthConfig, awsConfig.getStsRoleArn(), awsConfig.getRegion()); + + if (Objects.nonNull(awsConfig.getStsRoleSessionName())) { + baseIamAuthConfig += String.format(" awsRoleSessionName=\"%s\"", awsConfig.getStsRoleSessionName()); + } + + baseIamAuthConfig += ";"; + properties.put(SASL_JAAS_CONFIG, baseIamAuthConfig); } else if (awsIamAuthConfig == AwsIamAuthConfig.DEFAULT) { properties.put(SASL_JAAS_CONFIG, "software.amazon.msk.auth.iam.IAMLoginModule required;"); @@ -247,15 +256,18 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth } } - public static void setAuthProperties(Properties properties, final KafkaSourceConfig sourceConfig, final Logger LOG) { - final AwsConfig awsConfig = sourceConfig.getAwsConfig(); - final AuthConfig authConfig = sourceConfig.getAuthConfig(); - final KafkaSourceConfig.EncryptionConfig encryptionConfig = sourceConfig.getEncryptionConfig(); + public static void setAuthProperties(Properties properties, final KafkaConnectionConfig consumerConfig, final Logger LOG) { + final AwsConfig awsConfig = consumerConfig.getAwsConfig(); + final AuthConfig authConfig = consumerConfig.getAuthConfig(); + final EncryptionConfig encryptionConfig = consumerConfig.getEncryptionConfig(); final EncryptionType encryptionType = encryptionConfig.getType(); credentialsProvider = DefaultCredentialsProvider.create(); - String bootstrapServers = sourceConfig.getBootStrapServers(); + String bootstrapServers = ""; + if (Objects.nonNull(consumerConfig.getBootstrapServers())) { + bootstrapServers = String.join(",", consumerConfig.getBootstrapServers()); + } AwsIamAuthConfig awsIamAuthConfig = null; if (Objects.nonNull(authConfig)) { AuthConfig.SaslAuthConfig saslAuthConfig = authConfig.getSaslAuthConfig(); @@ -273,7 +285,7 @@ public static void setAuthProperties(Properties properties, final KafkaSourceCon setAwsIamAuthProperties(properties, awsIamAuthConfig, awsConfig); bootstrapServers = getBootStrapServersForMsk(awsIamAuthConfig, awsConfig, LOG); } else if (Objects.nonNull(saslAuthConfig.getOAuthConfig())) { - setOauthProperties(sourceConfig, properties); + setOauthProperties(consumerConfig, properties); } else if (Objects.nonNull(plainTextAuthConfig)) { setPlainTextAuthProperties(properties, plainTextAuthConfig, encryptionType); } else { @@ -295,13 +307,13 @@ public static void setAuthProperties(Properties properties, final KafkaSourceCon properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); } - public static GlueSchemaRegistryKafkaDeserializer getGlueSerializer(final KafkaSourceConfig sourceConfig) { - SchemaConfig schemaConfig = sourceConfig.getSchemaConfig(); + public static GlueSchemaRegistryKafkaDeserializer getGlueSerializer(final KafkaConsumerConfig kafkaConsumerConfig) { + SchemaConfig schemaConfig = kafkaConsumerConfig.getSchemaConfig(); if (Objects.isNull(schemaConfig) || schemaConfig.getType() != SchemaRegistryType.AWS_GLUE) { return null; } Map configs = new HashMap(); - configs.put(AWSSchemaRegistryConstants.AWS_REGION, sourceConfig.getAwsConfig().getRegion()); + configs.put(AWSSchemaRegistryConstants.AWS_REGION, kafkaConsumerConfig.getAwsConfig().getRegion()); configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); configs.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); configs.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/MessageFormat.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/MessageFormat.java index f2074f1ab6..025ff0806e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/MessageFormat.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/MessageFormat.java @@ -17,7 +17,7 @@ */ public enum MessageFormat { - PLAINTEXT("plaintext"), JSON("json"), AVRO("avro"); + PLAINTEXT("plaintext"), JSON("json"), AVRO("avro"), BYTES("bytes"); private static final Map MESSAGE_FORMAT_MAP = Arrays.stream(MessageFormat.values()) .collect(Collectors.toMap( diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java index 2871b03459..5d54f688d2 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java @@ -13,13 +13,14 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.connect.json.JsonSerializer; import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; +import java.util.Objects; import java.util.Properties; /** @@ -107,26 +108,26 @@ public class SinkPropertyConfigurer { public static final String RETRY_BACKOFF_MS = "retry.backoff.ms"; - public static Properties getProducerProperties(final KafkaSinkConfig kafkaSinkConfig) { + public static Properties getProducerProperties(final KafkaProducerConfig kafkaProducerConfig) { final Properties properties = new Properties(); - setCommonServerProperties(properties, kafkaSinkConfig); + setCommonServerProperties(properties, kafkaProducerConfig); - setPropertiesForSerializer(properties, kafkaSinkConfig.getSerdeFormat()); - - if (kafkaSinkConfig.getSchemaConfig() != null) { - setSchemaProps(kafkaSinkConfig.getSerdeFormat(), kafkaSinkConfig.getSchemaConfig(), properties); + setPropertiesForSerializer(properties, kafkaProducerConfig.getSerdeFormat()); + + if (kafkaProducerConfig.getSchemaConfig() != null) { + setSchemaProps(kafkaProducerConfig.getSerdeFormat(), kafkaProducerConfig.getSchemaConfig(), properties); } - if (kafkaSinkConfig.getKafkaProducerProperties() != null) { - setPropertiesProviderByKafkaProducer(kafkaSinkConfig.getKafkaProducerProperties(), properties); + if (kafkaProducerConfig.getKafkaProducerProperties() != null) { + setPropertiesProviderByKafkaProducer(kafkaProducerConfig.getKafkaProducerProperties(), properties); } - setAuthProperties(kafkaSinkConfig, properties); + setAuthProperties(kafkaProducerConfig, properties); return properties; } - private static void setAuthProperties(final KafkaSinkConfig kafkaSinkConfig, final Properties properties) { + private static void setAuthProperties(final KafkaProducerConfig kafkaSinkConfig, final Properties properties) { if (kafkaSinkConfig.getAuthConfig().getSaslAuthConfig().getPlainTextAuthConfig() != null) { final String sslEndpointAlgorithm = kafkaSinkConfig.getAuthConfig().getSaslAuthConfig().getSslEndpointAlgorithm(); if (null != sslEndpointAlgorithm && !sslEndpointAlgorithm.isEmpty() && sslEndpointAlgorithm.equalsIgnoreCase("https")) { @@ -144,8 +145,10 @@ private static void setAuthProperties(final KafkaSinkConfig kafkaSinkConfig, fin } - private static void setCommonServerProperties(final Properties properties, final KafkaSinkConfig kafkaSinkConfig) { - properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaSinkConfig.getBootStrapServers()); + private static void setCommonServerProperties(final Properties properties, final KafkaProducerConfig kafkaSinkConfig) { + if (Objects.nonNull(kafkaSinkConfig.getBootstrapServers())){ + properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, String.join(",", kafkaSinkConfig.getBootstrapServers())); + } properties.put(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG, SESSION_TIMEOUT_MS_CONFIG); } @@ -323,11 +326,11 @@ private static void setPropertiesProviderByKafkaProducer(final KafkaProducerProp LOG.info("Producer properties ends"); } - public static Properties getPropertiesForAdmintClient(final KafkaSinkConfig kafkaSinkConfig) { + public static Properties getPropertiesForAdminClient(final KafkaProducerConfig kafkaProducerConfig) { Properties properties = new Properties(); - setCommonServerProperties(properties, kafkaSinkConfig); - setAuthProperties(kafkaSinkConfig, properties); - properties.put(TopicConfig.RETENTION_MS_CONFIG,kafkaSinkConfig.getTopic().getRetentionPeriod()); + setCommonServerProperties(properties, kafkaProducerConfig); + KafkaSecurityConfigurer.setAuthProperties(properties, kafkaProducerConfig, LOG); + properties.put(TopicConfig.RETENTION_MS_CONFIG,kafkaProducerConfig.getTopic().getRetentionPeriod()); return properties; } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java new file mode 100644 index 0000000000..fd304d810d --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java @@ -0,0 +1,222 @@ +package org.opensearch.dataprepper.plugins.kafka.buffer; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaBufferConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer; +import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducerFactory; +import org.opensearch.dataprepper.plugins.kafka.producer.ProducerWorker; +import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class KafkaBufferTest { + + private static final String TEST_GROUP_ID = "testGroupId"; + + private KafkaBuffer> kafkaBuffer; + ExecutorService executorService; + @Mock + private KafkaBufferConfig bufferConfig; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + + @Mock + private PluginSetting pluginSetting; + + @Mock + private PluginFactory pluginFactory; + + @Mock + TopicConfig topic1; + @Mock + AuthConfig authConfig; + @Mock + AuthConfig.SaslAuthConfig saslAuthConfig; + @Mock + PlainTextAuthConfig plainTextAuthConfig; + + @Mock + private EncryptionConfig encryptionConfig; + + @Mock + FutureTask futureTask; + + @Mock + KafkaCustomProducerFactory producerFactory; + + @Mock + KafkaCustomProducer producer; + + @Mock + BlockingBuffer> blockingBuffer; + + public KafkaBuffer> createObjectUnderTest() { + + try ( + final MockedStatic executorsMockedStatic = mockStatic(Executors.class); + final MockedConstruction producerFactoryMock = + mockConstruction(KafkaCustomProducerFactory.class, (mock, context) -> { + producerFactory = mock; + when(producerFactory.createProducer(any() ,any(), any(), isNull(), isNull())).thenReturn(producer); + }); + final MockedConstruction blockingBufferMock = + mockConstruction(BlockingBuffer.class, (mock, context) -> { + blockingBuffer = mock; + })) { + + executorsMockedStatic.when(() -> Executors.newFixedThreadPool(anyInt())).thenReturn(executorService); + return new KafkaBuffer>(pluginSetting, bufferConfig, pluginFactory, acknowledgementSetManager, pluginMetrics); + } + } + + + + @BeforeEach + void setUp() { + when(pluginSetting.getPipelineName()).thenReturn("pipeline"); + pluginMetrics = mock(PluginMetrics.class); + acknowledgementSetManager = mock(AcknowledgementSetManager.class); + when(topic1.getName()).thenReturn("topic1"); + when(topic1.isCreate()).thenReturn(true); + + when(topic1.getWorkers()).thenReturn(2); + when(topic1.getCommitInterval()).thenReturn(Duration.ofSeconds(1)); + when(topic1.getAutoOffsetReset()).thenReturn("earliest"); + when(topic1.getConsumerMaxPollRecords()).thenReturn(1); + when(topic1.getGroupId()).thenReturn(TEST_GROUP_ID); + when(topic1.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); + when(topic1.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(5)); + when(topic1.getAutoCommit()).thenReturn(false); + when(topic1.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT); + when(topic1.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(10)); + when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + + when(bufferConfig.getBootstrapServers()).thenReturn(Collections.singletonList("http://localhost:1234")); + when(bufferConfig.getTopic()).thenReturn(topic1); + + when(bufferConfig.getSchemaConfig()).thenReturn(null); + when(bufferConfig.getEncryptionConfig()).thenReturn(encryptionConfig); + when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE); + when(bufferConfig.getSerdeFormat()).thenReturn("plaintext"); + + when(bufferConfig.getAuthConfig()).thenReturn(authConfig); + when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig); + when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig); + when(plainTextAuthConfig.getUsername()).thenReturn("username"); + when(plainTextAuthConfig.getPassword()).thenReturn("password"); + + executorService = mock(ExecutorService.class); + when(executorService.submit(any(ProducerWorker.class))).thenReturn(futureTask); + + } + + @Test + void test_kafkaBuffer_basicFunctionality() throws TimeoutException { + kafkaBuffer = createObjectUnderTest(); + assertTrue(Objects.nonNull(kafkaBuffer)); + + Record record = new Record(JacksonEvent.fromMessage(UUID.randomUUID().toString())); + kafkaBuffer.doWrite(record, 10000); + kafkaBuffer.doRead(10000); + verify(producer).produceRecords(record); + verify(blockingBuffer).doRead(anyInt()); + } + + @Test + void test_kafkaBuffer_producerThrows() throws TimeoutException { + + kafkaBuffer = createObjectUnderTest(); + Record record = new Record(JacksonEvent.fromMessage(UUID.randomUUID().toString())); + doThrow(new RuntimeException("Producer Error")) + .when(producer).produceRecords(record); + + assertThrows(RuntimeException.class, () -> kafkaBuffer.doWrite(record, 10000)); + } + + @Test + void test_kafkaBuffer_doWriteAll() throws Exception { + kafkaBuffer = createObjectUnderTest(); + assertTrue(Objects.nonNull(kafkaBuffer)); + + Record record = new Record(JacksonEvent.fromMessage(UUID.randomUUID().toString())); + Record record2 = new Record(JacksonEvent.fromMessage(UUID.randomUUID().toString())); + + kafkaBuffer.doWriteAll(Arrays.asList(record,record2), 10000); + verify(producer).produceRecords(record); + verify(producer).produceRecords(record2); + } + + @Test + void test_kafkaBuffer_isEmpty() { + kafkaBuffer = createObjectUnderTest(); + assertTrue(Objects.nonNull(kafkaBuffer)); + + kafkaBuffer.isEmpty(); + verify(blockingBuffer).isEmpty(); + } + + @Test + void test_kafkaBuffer_doCheckpoint() { + kafkaBuffer = createObjectUnderTest(); + kafkaBuffer.doCheckpoint(mock(CheckpointState.class)); + verify(blockingBuffer).doCheckpoint(any()); + } + + @Test + void test_kafkaBuffer_postProcess() { + kafkaBuffer = createObjectUnderTest(); + kafkaBuffer.postProcess(0L); + verify(blockingBuffer).postProcess(0L); + } + + + +} \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java index de3308a1f0..e3c4d6fde8 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java @@ -49,8 +49,8 @@ void test_kafka_config_not_null() { @Test void test_bootStrapServers_not_null() { - assertThat(kafkaSinkConfig.getBootStrapServers(), notNullValue()); - List servers = kafkaSinkConfig.getBootStrapServers(); + assertThat(kafkaSinkConfig.getBootstrapServers(), notNullValue()); + List servers = kafkaSinkConfig.getBootstrapServers(); bootstrapServers = servers.stream(). flatMap(str -> Arrays.stream(str.split(","))). map(String::trim). diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfigTest.java index 2da916d79f..b2634659cc 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfigTest.java @@ -57,8 +57,8 @@ void test_kafka_config_not_null() { @Test void test_bootStrapServers_not_null(){ - assertThat(kafkaSourceConfig.getBootStrapServers(), notNullValue()); - String bootstrapServers = kafkaSourceConfig.getBootStrapServers(); + assertThat(kafkaSourceConfig.getBootstrapServers(), notNullValue()); + String bootstrapServers = kafkaSourceConfig.getBootstrapServers().get(0); assertTrue(bootstrapServers.contains("127.0.0.1:9093")); } @@ -71,19 +71,19 @@ void test_topics_not_null(){ @Test void test_setters() throws NoSuchFieldException, IllegalAccessException { kafkaSourceConfig = new KafkaSourceConfig(); - KafkaSourceConfig.EncryptionConfig encryptionConfig = kafkaSourceConfig.getEncryptionConfig(); + EncryptionConfig encryptionConfig = kafkaSourceConfig.getEncryptionConfig(); kafkaSourceConfig.setBootStrapServers(new ArrayList<>(Arrays.asList("127.0.0.1:9092"))); TopicConfig topicConfig = mock(TopicConfig.class); kafkaSourceConfig.setTopics(Collections.singletonList(topicConfig)); - assertEquals("127.0.0.1:9092", kafkaSourceConfig.getBootStrapServers()); + assertEquals(Collections.singletonList("127.0.0.1:9092"), kafkaSourceConfig.getBootstrapServers()); assertEquals(Collections.singletonList(topicConfig), kafkaSourceConfig.getTopics()); setField(KafkaSourceConfig.class, kafkaSourceConfig, "acknowledgementsEnabled", true); assertEquals(true, kafkaSourceConfig.getAcknowledgementsEnabled()); assertEquals(EncryptionType.SSL, kafkaSourceConfig.getEncryptionConfig().getType()); - setField(KafkaSourceConfig.EncryptionConfig.class, encryptionConfig, "type", EncryptionType.NONE); + setField(EncryptionConfig.class, encryptionConfig, "type", EncryptionType.NONE); assertEquals(EncryptionType.NONE, encryptionConfig.getType()); - setField(KafkaSourceConfig.EncryptionConfig.class, encryptionConfig, "type", EncryptionType.SSL); + setField(EncryptionConfig.class, encryptionConfig, "type", EncryptionType.SSL); assertEquals(EncryptionType.SSL, encryptionConfig.getType()); } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java similarity index 98% rename from data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java rename to data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java index 1e61146e55..fda9252117 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java @@ -61,7 +61,7 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) -public class KafkaSourceCustomConsumerTest { +public class KafkaCustomConsumerTest { @Mock private KafkaConsumer kafkaConsumer; @@ -82,7 +82,7 @@ public class KafkaSourceCustomConsumerTest { @Mock private KafkaTopicMetrics topicMetrics; - private KafkaSourceCustomConsumer consumer; + private KafkaCustomConsumer consumer; private ConsumerRecords consumerRecords; @@ -144,9 +144,9 @@ public void setUp() { when(topicConfig.getName()).thenReturn("topic1"); } - public KafkaSourceCustomConsumer createObjectUnderTest(String schemaType, boolean acknowledgementsEnabled) { + public KafkaCustomConsumer createObjectUnderTest(String schemaType, boolean acknowledgementsEnabled) { when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(acknowledgementsEnabled); - return new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topicConfig, schemaType, acknowledgementSetManager, topicMetrics); + return new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topicConfig, schemaType, acknowledgementSetManager, topicMetrics); } private BlockingBuffer> getBuffer() { diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java similarity index 88% rename from data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java rename to data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java index fc43ce2f13..5216537f9f 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java @@ -48,16 +48,16 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) -public class KafkaSinkProducerTest { +public class KafkaCustomProducerTest { - private KafkaSinkProducer producer; + private KafkaCustomProducer producer; @Mock private KafkaSinkConfig kafkaSinkConfig; private Record record; - KafkaSinkProducer sinkProducer; + KafkaCustomProducer sinkProducer; @Mock private DLQSink dlqSink; @@ -84,7 +84,7 @@ record = new Record<>(event); public void producePlainTextRecordsTest() throws ExecutionException, InterruptedException { when(kafkaSinkConfig.getSerdeFormat()).thenReturn("plaintext"); MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer()); - producer = new KafkaSinkProducer(mockProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class), null); + producer = new KafkaCustomProducer(mockProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class), null); sinkProducer = spy(producer); sinkProducer.produceRecords(record); verify(sinkProducer).produceRecords(record); @@ -96,7 +96,7 @@ public void producePlainTextRecordsTest() throws ExecutionException, Interrupted public void produceJsonRecordsTest() throws RestClientException, IOException { when(kafkaSinkConfig.getSerdeFormat()).thenReturn("JSON"); MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new JsonSerializer()); - producer = new KafkaSinkProducer(mockProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class), null); + producer = new KafkaCustomProducer(mockProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class), null); SchemaMetadata schemaMetadata = mock(SchemaMetadata.class); String jsonSchema = "{\n" + " \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n" + @@ -119,7 +119,7 @@ public void produceJsonRecordsTest() throws RestClientException, IOException { public void produceAvroRecordsTest() throws Exception { when(kafkaSinkConfig.getSerdeFormat()).thenReturn("AVRO"); MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer()); - producer = new KafkaSinkProducer(mockProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class), null); + producer = new KafkaCustomProducer(mockProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class), null); SchemaMetadata schemaMetadata = mock(SchemaMetadata.class); String avroSchema = "{\"type\":\"record\",\"name\":\"MyMessage\",\"fields\":[{\"name\":\"message\",\"type\":\"string\"}]}"; when(schemaMetadata.getSchema()).thenReturn(avroSchema); @@ -131,9 +131,9 @@ public void produceAvroRecordsTest() throws Exception { @Test public void testGetGenericRecord() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { - producer = new KafkaSinkProducer(new MockProducer(), kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class), null); + producer = new KafkaCustomProducer(new MockProducer(), kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class), null); final Schema schema = createMockSchema(); - Method privateMethod = KafkaSinkProducer.class.getDeclaredMethod("getGenericRecord", Event.class, Schema.class); + Method privateMethod = KafkaCustomProducer.class.getDeclaredMethod("getGenericRecord", Event.class, Schema.class); privateMethod.setAccessible(true); GenericRecord result = (GenericRecord) privateMethod.invoke(producer, event, schema); Assertions.assertNotNull(result); @@ -149,7 +149,7 @@ private Schema createMockSchema() { public void validateSchema() throws IOException, ProcessingException { when(kafkaSinkConfig.getSerdeFormat()).thenReturn("avro"); MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer()); - producer = new KafkaSinkProducer(mockProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class), null); + producer = new KafkaCustomProducer(mockProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class), null); String jsonSchema = "{\"type\": \"object\",\"properties\": {\"Year\": {\"type\": \"string\"},\"Age\": {\"type\": \"string\"},\"Ethnic\": {\"type\":\"string\",\"default\": null}}}"; String jsonSchema2 = "{\"type\": \"object\",\"properties\": {\"Year\": {\"type\": \"string\"},\"Age\": {\"type\": \"string\"},\"Ethnic\": {\"type\":\"string\",\"default\": null}}}"; assertTrue(producer.validateSchema(jsonSchema, jsonSchema2)); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorkerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorkerTest.java index 65492af774..d8b06ce9ca 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorkerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorkerTest.java @@ -35,7 +35,7 @@ record = new Record<>(event); private ProducerWorker createObjectUnderTest() { - return new ProducerWorker(mock(KafkaSinkProducer.class), record); + return new ProducerWorker(mock(KafkaCustomProducer.class), record); } @Test diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java index 36fea63bbe..bc79f330d3 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; @@ -37,6 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; +import java.util.Collections; import java.util.Objects; import java.time.Duration; @@ -49,7 +51,7 @@ class KafkaSourceTest { private KafkaSourceConfig sourceConfig; @Mock - private KafkaSourceConfig.EncryptionConfig encryptionConfig; + private EncryptionConfig encryptionConfig; @Mock private PluginMetrics pluginMetrics; @@ -78,7 +80,7 @@ public KafkaSource createObjectUnderTest() { @BeforeEach void setUp() throws Exception { sourceConfig = mock(KafkaSourceConfig.class); - encryptionConfig = mock(KafkaSourceConfig.EncryptionConfig.class); + encryptionConfig = mock(EncryptionConfig.class); pipelineDescription = mock(PipelineDescription.class); pluginMetrics = mock(PluginMetrics.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); @@ -104,7 +106,7 @@ void setUp() throws Exception { when(topic2.getAutoCommit()).thenReturn(false); when(topic1.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(10)); when(topic2.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(10)); - when(sourceConfig.getBootStrapServers()).thenReturn("http://localhost:1234"); + when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList("http://localhost:1234")); when(sourceConfig.getTopics()).thenReturn(Arrays.asList(topic1, topic2)); when(sourceConfig.getSchemaConfig()).thenReturn(null); when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig); diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml index 18eb541d87..cb735d6736 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml @@ -4,7 +4,8 @@ log-pipeline: bootstrap_servers: - 127.0.0.1:9093 client_dns_lookup: use_all_dns_ips - encryption: plaintext + encryption: + type: ssl topics: - name: my-topic-1 group_id: my-test-group