Skip to content

Commit

Permalink
Refactor Kafka Source and Sink, implement basic Kafka buffer (opensea…
Browse files Browse the repository at this point in the history
…rch-project#3354)

* Refactor Kafka Source and Sink to make Kafka producer and consumer logic more reusable

Signed-off-by: Jonah Calvo <[email protected]>

* Implement requested changes + simple kafka buffer

Signed-off-by: Jonah Calvo <[email protected]>

* Cleaning up logs, add TODOs, etc.

Signed-off-by: Jonah Calvo <[email protected]>

* Add support for MSK in kafka buffer

Signed-off-by: Jonah Calvo <[email protected]>

* Change Topics to list for now

Signed-off-by: Jonah Calvo <[email protected]>

* update config yaml names

Signed-off-by: Jonah Calvo <[email protected]>

* Fix unit tests

Signed-off-by: Jonah Calvo <[email protected]>

---------

Signed-off-by: Jonah Calvo <[email protected]>
  • Loading branch information
JonahCalvo authored Oct 6, 2023
1 parent d3a027a commit b0d253c
Show file tree
Hide file tree
Showing 43 changed files with 1,009 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ protected int getRecordsInFlight() {
*
* @param recordsInBuffer the current number of records in the buffer
*/
protected void postProcess(final Long recordsInBuffer) {
public void postProcess(final Long recordsInBuffer) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private T pollForBufferEntry(final int timeoutValue, final TimeUnit timeoutUnit)
}

@Override
protected void postProcess(final Long recordsInBuffer) {
public void postProcess(final Long recordsInBuffer) {
// adding bounds to address race conditions and reporting negative buffer usage
final Double nonNegativeTotalRecords = recordsInBuffer.doubleValue() < 0 ? 0 : recordsInBuffer.doubleValue();
final Double boundedTotalRecords = nonNegativeTotalRecords > bufferSize ? bufferSize : nonNegativeTotalRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public static PluginSetting getDefaultPluginSettings() {
}

@Override
protected void postProcess(final Long recordsInBuffer) {
public void postProcess(final Long recordsInBuffer) {
// adding bounds to address race conditions and reporting negative buffer usage
final Double nonNegativeTotalRecords = recordsInBuffer.doubleValue() < 0 ? 0 : recordsInBuffer.doubleValue();
final Double boundedTotalRecords = nonNegativeTotalRecords > bufferCapacity ? bufferCapacity : nonNegativeTotalRecords;
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/kafka-plugins/README-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ bin/zookeeper-server-start.sh config/zookeeper.properties
2. Start Kafka Server with the following configuration
Configuration in config/server.properties
```
isteners=SASL_SSL://localhost:9093,PLAINTEXT://localhost:9092,SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095
listeners=SASL_SSL://localhost:9093,PLAINTEXT://localhost:9092,SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/kafka-plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ bin/zookeeper-server-start.sh config/zookeeper.properties
2. Start Kafka Server with the following configuration
Configuration in config/server.properties
```
isteners=SASL_SSL://localhost:9093,PLAINTEXT://localhost:9092,SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095
listeners=SASL_SSL://localhost:9093,PLAINTEXT://localhost:9092,SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ plugins {
dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:blocking-buffer')
implementation 'org.apache.kafka:kafka-clients:3.4.0'
implementation 'org.apache.avro:avro:1.11.0'
implementation 'com.fasterxml.jackson.core:jackson-databind'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void setup() throws RestClientException, IOException {
when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));

bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers");
when(kafkaSinkConfig.getBootStrapServers()).thenReturn(Collections.singletonList(bootstrapServers));
when(kafkaSinkConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers));

props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void setup() {
when(topicConfig.getAutoOffsetReset()).thenReturn("earliest");
when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));
bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers");
when(kafkaSinkConfig.getBootStrapServers()).thenReturn(Collections.singletonList(bootstrapServers));
when(kafkaSinkConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers));
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
}

Expand Down Expand Up @@ -253,6 +253,7 @@ private void pollRecords(List<Record<Event>> recList, KafkaConsumer<String, Json

JsonNode recValue = record.value();
String ss = recValue.asText();

assertThat(ss, CoreMatchers.containsString(inputJsonStr));
if (recListCounter + 1 == recList.size()) {
isPollNext = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void setup() {
when(topicConfig.getAutoOffsetReset()).thenReturn("earliest");
when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));
bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers");
when(kafkaSinkConfig.getBootStrapServers()).thenReturn(Collections.singletonList(bootstrapServers));
when(kafkaSinkConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers));
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig;
Expand Down Expand Up @@ -70,7 +71,7 @@ public class KafkaSourceJsonTypeIT {
private List<TopicConfig> topicList;

@Mock
private KafkaSourceConfig.EncryptionConfig encryptionConfig;
private EncryptionConfig encryptionConfig;

@Mock
private TopicConfig jsonTopic;
Expand All @@ -96,7 +97,7 @@ public void setup() {
pluginMetrics = mock(PluginMetrics.class);
counter = mock(Counter.class);
buffer = mock(Buffer.class);
encryptionConfig = mock(KafkaSourceConfig.EncryptionConfig.class);
encryptionConfig = mock(EncryptionConfig.class);
receivedRecords = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(2);
acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor);
Expand Down Expand Up @@ -128,7 +129,7 @@ public void setup() {
when(jsonTopic.getAutoOffsetReset()).thenReturn("earliest");
when(jsonTopic.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));
bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers");
when(sourceConfig.getBootStrapServers()).thenReturn(bootstrapServers);
when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers));
when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class KafkaSourceMultipleAuthTypeIT {
private PlainTextAuthConfig plainTextAuthConfig;

@Mock
private KafkaSourceConfig.EncryptionConfig encryptionConfig;
private EncryptionConfig encryptionConfig;

private TopicConfig jsonTopic;
private TopicConfig avroTopic;
Expand Down Expand Up @@ -105,7 +106,7 @@ public void setup() {
pluginMetrics = mock(PluginMetrics.class);
counter = mock(Counter.class);
buffer = mock(Buffer.class);
encryptionConfig = mock(KafkaSourceConfig.EncryptionConfig.class);
encryptionConfig = mock(EncryptionConfig.class);
receivedRecords = new ArrayList<>();
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
pipelineDescription = mock(PipelineDescription.class);
Expand Down Expand Up @@ -140,7 +141,7 @@ public void setup() {
sslBootstrapServers = System.getProperty("tests.kafka.ssl_bootstrap_servers");
kafkaUsername = System.getProperty("tests.kafka.username");
kafkaPassword = System.getProperty("tests.kafka.password");
when(sourceConfig.getBootStrapServers()).thenReturn(bootstrapServers);
when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers));
when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig);
}

Expand Down Expand Up @@ -205,7 +206,7 @@ public void TestPlainTextWithAuthKafkaNoEncryptionWithNoAuthSchemaRegistry() thr
when(sourceConfig.getAuthConfig()).thenReturn(authConfig);
when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig);
when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig);
when(sourceConfig.getBootStrapServers()).thenReturn(saslplainBootstrapServers);
when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList(saslplainBootstrapServers));
kafkaSource = createObjectUnderTest();

Properties props = new Properties();
Expand Down Expand Up @@ -256,7 +257,7 @@ public void TestPlainTextWithNoAuthKafkaEncryptionWithNoAuthSchemaRegistry() thr
when(encryptionConfig.getInsecure()).thenReturn(true);
when(encryptionConfig.getType()).thenReturn(EncryptionType.SSL);
when(plainTextTopic.getConsumerMaxPollRecords()).thenReturn(numRecords);
when(sourceConfig.getBootStrapServers()).thenReturn(sslBootstrapServers);
when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList(sslBootstrapServers));
when(sourceConfig.getTopics()).thenReturn(List.of(plainTextTopic));
kafkaSource = createObjectUnderTest();

Expand Down Expand Up @@ -312,7 +313,7 @@ public void TestPlainTextWithAuthKafkaEncryptionWithNoAuthSchemaRegistry() throw
when(encryptionConfig.getInsecure()).thenReturn(true);
when(encryptionConfig.getType()).thenReturn(EncryptionType.SSL);
when(plainTextTopic.getConsumerMaxPollRecords()).thenReturn(numRecords);
when(sourceConfig.getBootStrapServers()).thenReturn(saslsslBootstrapServers);
when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList(saslsslBootstrapServers));
when(sourceConfig.getTopics()).thenReturn(List.of(plainTextTopic));
kafkaSource = createObjectUnderTest();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.AwsIamAuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.MskBrokerConnectionType;
Expand Down Expand Up @@ -97,7 +98,7 @@ public class MskGlueRegistryMultiTypeIT {
private AwsConfig.AwsMskConfig awsMskConfig;

@Mock
private KafkaSourceConfig.EncryptionConfig encryptionConfig;
private EncryptionConfig encryptionConfig;

private KafkaSource kafkaSource;
private TopicConfig jsonTopic;
Expand Down Expand Up @@ -180,8 +181,8 @@ public void setup() {
testAvroSchemaName = System.getProperty("tests.kafka.glue_avro_schema_name");
testMskArn = System.getProperty("tests.msk.arn");
testMskRegion = System.getProperty("tests.msk.region");
when(sourceConfig.getBootStrapServers()).thenReturn(bootstrapServers);
encryptionConfig = mock(KafkaSourceConfig.EncryptionConfig.class);
when(sourceConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers));
encryptionConfig = mock(EncryptionConfig.class);
when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig);
System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.opensearch.dataprepper.plugins.kafka.buffer;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.AbstractBuffer;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaBufferConfig;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer;
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

@DataPrepperPlugin(name = "kafka", pluginType = Buffer.class, pluginConfigurationType = KafkaBufferConfig.class)
public class KafkaBuffer<T extends Record<?>> extends AbstractBuffer<T> {

private static final Logger LOG = LoggerFactory.getLogger(KafkaBuffer.class);
public static final int INNER_BUFFER_CAPACITY = 1000000;
public static final int INNER_BUFFER_BATCH_SIZE = 250000;
private final KafkaCustomProducer producer;
private final AbstractBuffer innerBuffer;
private final ExecutorService executorService;

@DataPrepperPluginConstructor
public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig kafkaBufferConfig, final PluginFactory pluginFactory,
final AcknowledgementSetManager acknowledgementSetManager, final PluginMetrics pluginMetrics){
super(pluginSetting);
final KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory();
producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, pluginFactory, pluginSetting, null, null);
final KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory();
innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName());
final List<KafkaCustomConsumer> consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
innerBuffer, pluginMetrics, acknowledgementSetManager, new AtomicBoolean(false));
this.executorService = Executors.newFixedThreadPool(consumers.size());
consumers.forEach(this.executorService::submit);
}

@Override
public void doWrite(T record, int timeoutInMillis) throws TimeoutException {
try {
producer.produceRecords(record);
} catch (final Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}

@Override
public void doWriteAll(Collection<T> records, int timeoutInMillis) throws Exception {
for ( T record: records ) {
doWrite(record, timeoutInMillis);
}
}

@Override
public Map.Entry<Collection<T>, CheckpointState> doRead(int timeoutInMillis) {
return innerBuffer.doRead(timeoutInMillis);
}

@Override
public void postProcess(final Long recordsInBuffer) {
innerBuffer.postProcess(recordsInBuffer);
}

@Override
public void doCheckpoint(CheckpointState checkpointState) {
innerBuffer.doCheckpoint(checkpointState);
}

@Override
public boolean isEmpty() {
// TODO: check Kafka topic is empty as well.
return innerBuffer.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public MskBrokerConnectionType getBrokerConnectionType() {
@JsonProperty("sts_role_arn")
private String stsRoleArn;

@JsonProperty("role_session_name")
private String stsRoleSessionName;

public AwsMskConfig getAwsMskConfig() {
return awsMskConfig;
}
Expand All @@ -53,4 +56,8 @@ public String getRegion() {
public String getStsRoleArn() {
return stsRoleArn;
}

public String getStsRoleSessionName() {
return stsRoleSessionName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.opensearch.dataprepper.plugins.kafka.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;

public class EncryptionConfig {
@JsonProperty("type")
private EncryptionType type = EncryptionType.SSL;

@JsonProperty("insecure")
private boolean insecure = false;

public EncryptionType getType() {
return type;
}

public boolean getInsecure() {
return insecure;
}
}
Loading

0 comments on commit b0d253c

Please sign in to comment.