Skip to content

Commit

Permalink
Kafka sink (#3127)
Browse files Browse the repository at this point in the history
* -Support for kafka-sink
Signed-off-by: rajeshLovesToCode <[email protected]>

* -Support for kafka-sink
Signed-off-by: rajeshLovesToCode <[email protected]>

* -Support for kafka-sink
Signed-off-by: rajeshLovesToCode <[email protected]>

* -Support for kafka-sink
Signed-off-by: rajeshLovesToCode <[email protected]>

* -Support for kafka-sink
Signed-off-by: rajeshLovesToCode <[email protected]>

* -Support for kafka-sink
Signed-off-by: rajeshLovesToCode <[email protected]>

* -Support for kafka-sink
Signed-off-by: rajeshLovesToCode <[email protected]>

* -Support for kafka-sink
Signed-off-by: rajeshLovesToCode <[email protected]>

* -Support for kafka-sink
Signed-off-by: rajeshLovesToCode <[email protected]>
  • Loading branch information
rajeshLovesToCode authored Aug 26, 2023
1 parent 8114ab4 commit b5e38e3
Show file tree
Hide file tree
Showing 21 changed files with 1,904 additions and 100 deletions.
311 changes: 311 additions & 0 deletions data-prepper-plugins/kafka-plugins/README-sink.md

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ dependencies {
implementation 'org.apache.kafka:connect-json:3.4.0'
implementation 'com.github.fge:json-schema-validator:2.2.14'
implementation 'commons-collections:commons-collections:3.2.2'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:apache-client'


}
Expand Down Expand Up @@ -89,6 +91,9 @@ task integrationTest(type: Test) {
systemProperty 'tests.kafka.glue_avro_schema_name', System.getProperty('tests.kafka.glue_avro_schema_name')
systemProperty 'tests.msk.region', System.getProperty('tests.msk.region')
systemProperty 'tests.msk.arn', System.getProperty('tests.msk.arn')
systemProperty 'tests.kafka.confluent.registry_url', System.getProperty('tests.kafka.confluent.registry_url')
systemProperty 'tests.kafka.authconfig.username', System.getProperty('tests.kafka.authconfig.username')
systemProperty 'tests.kafka.authconfig.password', System.getProperty('tests.kafka.authconfig.password')

filter {
includeTestsMatching '*IT'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.sink;

import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.plugins.dlq.DlqProvider;
import org.opensearch.dataprepper.plugins.dlq.DlqWriter;
import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class KafkaSinkAvroTypeIT {
private static final int TEST_ID = 123456;

@Mock
SchemaConfig schemaConfig;

@Mock
private KafkaSinkConfig kafkaSinkConfig;

@Mock
private TopicConfig topicConfig;

private KafkaSink kafkaSink;

private String bootstrapServers;
private String testTopic;

private PluginSetting pluginSetting;

@Mock
private PluginFactory pluginFactory;

private SinkContext sinkContext;

private String registryUrl;

@Mock
private DlqProvider dlqProvider;

@Mock
private DlqWriter dlqWriter;

@Mock
private ExpressionEvaluator evaluator;

private AuthConfig authConfig;
private AuthConfig.SaslAuthConfig saslAuthConfig;
private PlainTextAuthConfig plainTextAuthConfig;
private static final Properties props = new Properties();


public KafkaSink createObjectUnderTest() {
return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, evaluator, sinkContext);
}

@BeforeEach
public void setup() throws RestClientException, IOException {
plainTextAuthConfig = mock(PlainTextAuthConfig.class);
authConfig = mock(AuthConfig.class);
saslAuthConfig = mock(AuthConfig.SaslAuthConfig.class);
schemaConfig = mock(SchemaConfig.class);
evaluator = mock(ExpressionEvaluator.class);
dlqWriter = mock(DlqWriter.class);
dlqProvider = mock(DlqProvider.class);
sinkContext = mock(SinkContext.class);
pluginFactory = mock(PluginFactory.class);
pluginSetting = mock(PluginSetting.class);
when(pluginSetting.getName()).thenReturn("name");
when(pluginSetting.getPipelineName()).thenReturn("pipelinename");

when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider);
when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter));

kafkaSinkConfig = mock(KafkaSinkConfig.class);

registryUrl = System.getProperty("tests.kafka.confluent.registry_url");
when(schemaConfig.getRegistryURL()).thenReturn(registryUrl);
String schemaStr = "{\"type\": \"record\", \"name\": \"Example\",\"fields\": " +
"[{\"name\": \"name\",\"type\": \"string\"}," +
"{\"name\": \"id\",\"type\": \"string\"} ]}";
Schema schema = Schema.parse("{\"type\": \"record\", \"name\": \"Example\",\"fields\": " +
"[{\"name\": \"name\",\"type\": \"string\"}," +
"{\"name\": \"id\",\"type\": \"string\"} ]}");

when(schemaConfig.getInlineSchema()).thenReturn(schemaStr);
when(schemaConfig.isCreate()).thenReturn(true);
when(kafkaSinkConfig.getSchemaConfig()).thenReturn(schemaConfig);

when(kafkaSinkConfig.getSerdeFormat()).thenReturn(MessageFormat.AVRO.toString());
when(kafkaSinkConfig.getPartitionKey()).thenReturn("test-${name}");

final String testGroup = "TestGroup_" + RandomStringUtils.randomAlphabetic(5);
testTopic = "TestTopic_" + RandomStringUtils.randomAlphabetic(5);

topicConfig = mock(TopicConfig.class);
when(topicConfig.getName()).thenReturn(testTopic);
when(topicConfig.getGroupId()).thenReturn(testGroup);
when(topicConfig.getWorkers()).thenReturn(1);
when(topicConfig.getSessionTimeOut()).thenReturn(Duration.ofSeconds(45));
when(topicConfig.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3));
when(topicConfig.getAutoCommit()).thenReturn(false);
when(topicConfig.getAutoOffsetReset()).thenReturn("earliest");
when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));

bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers");
when(kafkaSinkConfig.getBootStrapServers()).thenReturn(Collections.singletonList(bootstrapServers));

props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

}

@Test
public void TestPollRecordsAvroSASLPlainText() throws Exception {

configureJasConfForSASLPlainText();

final int numRecords = 1;
when(topicConfig.getConsumerMaxPollRecords()).thenReturn(numRecords);
when(topicConfig.isCreate()).thenReturn(false);
when(kafkaSinkConfig.getTopic()).thenReturn(topicConfig);

when(kafkaSinkConfig.getAuthConfig()).thenReturn(authConfig);
kafkaSink = createObjectUnderTest();

AtomicBoolean created = new AtomicBoolean(false);
final String topicName = topicConfig.getName();

createTopic(created, topicName);

final List<Record<Event>> records = new ArrayList<>();

for (int i = 0; i < numRecords; i++) {
final Map<String, String> eventData = new HashMap<>();
eventData.put("name", "testName");
eventData.put("id", "" + TEST_ID + i);
final JacksonEvent event = JacksonLog.builder().withData(eventData).build();
records.add(new Record<>(event));
}

kafkaSink.doInitialize();
kafkaSink.doOutput(records);

Thread.sleep(4000);

consumeTestMessages(records);

deleteTopic(created, topicName);
}

private void configureJasConfForSASLPlainText() {
String username = System.getProperty("tests.kafka.authconfig.username");
String password = System.getProperty("tests.kafka.authconfig.password");
when(plainTextAuthConfig.getUsername()).thenReturn(username);
when(plainTextAuthConfig.getPassword()).thenReturn(password);
when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig);
when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig);


String jasConf = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
props.put(SaslConfigs.SASL_JAAS_CONFIG, jasConf);
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put("security.protocol", SecurityProtocol.SASL_PLAINTEXT.toString());
}

private void createTopic(AtomicBoolean created, String topicName) throws InterruptedException {
try (AdminClient adminClient = AdminClient.create(props)) {
try {
adminClient.createTopics(
Collections.singleton(new NewTopic(topicName, 1, (short) 1)))
.all().get(30, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
created.set(true);
}
while (created.get() != true) {
Thread.sleep(1000);
}
}

private void deleteTopic(AtomicBoolean created, String topicName) throws InterruptedException {
try (AdminClient adminClient = AdminClient.create(props)) {
try {
adminClient.deleteTopics(Collections.singleton(topicName))
.all().get(30, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
created.set(false);
}
while (created.get() != false) {
Thread.sleep(1000);
}
}

private void consumeTestMessages(List<Record<Event>> recList) {

props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
topicConfig.getCommitInterval().toSecondsPart());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
topicConfig.getAutoOffsetReset());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
topicConfig.getAutoCommit());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
topicConfig.getConsumerMaxPollRecords());
props.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class);
props.put("schema.registry.url", schemaConfig.getRegistryURL());

KafkaConsumer<String, GenericRecord> kafkaConsumer = new KafkaConsumer<String, GenericRecord>(props);

kafkaConsumer.subscribe(Arrays.asList(topicConfig.getName()));

pollRecords(recList, kafkaConsumer);
}

private void pollRecords(List<Record<Event>> recList, KafkaConsumer<String, GenericRecord> kafkaConsumer) {
int recListCounter = 0;
boolean isPollNext = true;
while (isPollNext) {
ConsumerRecords<String, GenericRecord> records = kafkaConsumer.poll(1000);
if (!records.isEmpty() && records.count() > 0) {
for (ConsumerRecord<String, GenericRecord> record : records) {
Record<Event> recordEvent = recList.get(recListCounter);
String inputJsonStr = recordEvent.getData().toJsonString();

GenericRecord recValue = record.value();
String recValueStr = recValue.toString().replaceAll("\\s", "");
assertThat(recValueStr, CoreMatchers.containsString(inputJsonStr));
if (recListCounter + 1 == recList.size()) {
isPollNext = false;
}
recListCounter++;
break;
}
}
}
}
}
Loading

0 comments on commit b5e38e3

Please sign in to comment.