Skip to content

Commit

Permalink
Encrypted and decrypt data in the Kafka buffer when the user configur…
Browse files Browse the repository at this point in the history
…es. Use a KMS key to decrypt the data encryption key, if one is provided. Resolves opensearch-project#3422

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable committed Oct 10, 2023
1 parent 68875c4 commit 70b5765
Show file tree
Hide file tree
Showing 39 changed files with 1,120 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Provides a standard model for requesting AWS credentials.
*/
public class AwsCredentialsOptions {
private static final AwsCredentialsOptions DEFAULT_OPTIONS = new AwsCredentialsOptions();
private final String stsRoleArn;
private final String stsExternalId;
private final Region region;
Expand All @@ -27,6 +28,13 @@ private AwsCredentialsOptions(final Builder builder) {
this.stsHeaderOverrides = builder.stsHeaderOverrides != null ? new HashMap<>(builder.stsHeaderOverrides) : Collections.emptyMap();
}

private AwsCredentialsOptions() {
this.stsRoleArn = null;
this.stsExternalId = null;
this.region = null;
this.stsHeaderOverrides = Collections.emptyMap();
}

/**
* Constructs a new {@link Builder} to build the credentials
* options.
Expand All @@ -37,6 +45,10 @@ public static Builder builder() {
return new Builder();
}

public static AwsCredentialsOptions defaultOptions() {
return DEFAULT_OPTIONS;
}

public String getStsRoleArn() {
return stsRoleArn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import org.junit.jupiter.params.provider.ValueSource;
import software.amazon.awssdk.regions.Region;

import java.util.Collections;
import java.util.Map;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;

class AwsCredentialsOptionsTest {
Expand Down Expand Up @@ -131,4 +133,21 @@ void with_StsHeaderOverrides() {
assertThat(awsCredentialsOptions.getStsHeaderOverrides().size(), equalTo(stsHeaderOverrides.size()));
assertThat(awsCredentialsOptions.getStsHeaderOverrides(), equalTo(stsHeaderOverrides));
}

@Test
void defaultOptions_returns_with_null_or_empty_values() {
AwsCredentialsOptions defaultOptions = AwsCredentialsOptions.defaultOptions();

assertThat(defaultOptions, notNullValue());
assertThat(defaultOptions.getRegion(), nullValue());
assertThat(defaultOptions.getStsRoleArn(), nullValue());
assertThat(defaultOptions.getStsExternalId(), nullValue());
assertThat(defaultOptions.getStsHeaderOverrides(), equalTo(Collections.emptyMap()));
}

@Test
void defaultOptions_returns_same_instance_on_multiple_calls() {
assertThat(AwsCredentialsOptions.defaultOptions(),
sameInstance(AwsCredentialsOptions.defaultOptions()));
}
}
8 changes: 5 additions & 3 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:blocking-buffer')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'org.apache.kafka:kafka-clients:3.4.0'
implementation libs.avro.core
implementation 'com.fasterxml.jackson.core:jackson-databind'
Expand All @@ -21,10 +22,11 @@ dependencies {
implementation 'io.confluent:kafka-avro-serializer:7.3.3'
implementation 'io.confluent:kafka-schema-registry-client:7.3.3'
implementation 'io.confluent:kafka-schema-registry:7.3.3:tests'
implementation 'software.amazon.awssdk:sts'
implementation 'software.amazon.awssdk:auth'
implementation 'software.amazon.awssdk:kafka'
implementation 'software.amazon.awssdk:kms'
implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.6'
implementation 'software.amazon.awssdk:sts:2.20.103'
implementation 'software.amazon.awssdk:auth:2.20.103'
implementation 'software.amazon.awssdk:kafka:2.20.103'
implementation 'software.amazon.glue:schema-registry-serde:1.1.15'
implementation 'com.amazonaws:aws-java-sdk-glue:1.12.506'
implementation 'io.confluent:kafka-json-schema-serializer:7.4.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
Expand All @@ -45,8 +49,11 @@ public class KafkaBufferIT {
private PluginFactory pluginFactory;
@Mock
private AcknowledgementSetManager acknowledgementSetManager;
@Mock
private TopicConfig topicConfig;

private PluginMetrics pluginMetrics;
private String bootstrapServersCommaDelimited;

@BeforeEach
void setUp() {
Expand All @@ -57,7 +64,6 @@ void setUp() {
MessageFormat messageFormat = MessageFormat.JSON;

String topicName = "buffer-" + RandomStringUtils.randomAlphabetic(5);
TopicConfig topicConfig = mock(TopicConfig.class);
when(topicConfig.getName()).thenReturn(topicName);
when(topicConfig.getGroupId()).thenReturn("buffergroup-" + RandomStringUtils.randomAlphabetic(6));
when(topicConfig.isCreate()).thenReturn(true);
Expand All @@ -76,16 +82,16 @@ void setUp() {

EncryptionConfig encryptionConfig = mock(EncryptionConfig.class);

String bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers");
bootstrapServersCommaDelimited = System.getProperty("tests.kafka.bootstrap_servers");

LOG.info("Using Kafka bootstrap servers: {}", bootstrapServers);
LOG.info("Using Kafka bootstrap servers: {}", bootstrapServersCommaDelimited);

when(kafkaBufferConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServers));
when(kafkaBufferConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServersCommaDelimited));
when(kafkaBufferConfig.getEncryptionConfig()).thenReturn(encryptionConfig);
}

private KafkaBuffer<Record<Event>> createObjectUnderTest() {
return new KafkaBuffer<>(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, pluginMetrics);
return new KafkaBuffer<>(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, pluginMetrics, null);
}

@Test
Expand All @@ -110,8 +116,40 @@ void write_and_read() throws TimeoutException {
assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap()));
}

@Test
void write_and_read_encrypted() throws TimeoutException, NoSuchAlgorithmException {
when(topicConfig.getEncryptionKey()).thenReturn(createAesKey());

KafkaBuffer<Record<Event>> objectUnderTest = createObjectUnderTest();

Record<Event> record = createRecord();
objectUnderTest.write(record, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
assertThat(readResult.getKey().size(), equalTo(1));

Record<Event> onlyResult = readResult.getKey().stream().iterator().next();

assertThat(onlyResult, notNullValue());
assertThat(onlyResult.getData(), notNullValue());
// TODO: The metadata is not included. It needs to be included in the Buffer, though not in the Sink. This may be something we make configurable in the consumer/producer - whether to serialize the metadata or not.
//assertThat(onlyResult.getData().getMetadata(), equalTo(record.getData().getMetadata()));
assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap()));
}

private Record<Event> createRecord() {
Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
return new Record<>(event);
}

private static String createAesKey() throws NoSuchAlgorithmException {
KeyGenerator aesKeyGenerator = KeyGenerator.getInstance("AES");
aesKeyGenerator.init(256);
SecretKey secretKey = aesKeyGenerator.generateKey();
byte[] base64Bytes = Base64.getEncoder().encode(secretKey.getEncoded());
return new String(base64Bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
Expand Down Expand Up @@ -95,14 +96,17 @@ public class KafkaSinkAvroTypeIT {
@Mock
private ExpressionEvaluator evaluator;

@Mock
private AwsCredentialsSupplier awsCredentialsSupplier;

private AuthConfig authConfig;
private AuthConfig.SaslAuthConfig saslAuthConfig;
private PlainTextAuthConfig plainTextAuthConfig;
private static final Properties props = new Properties();


public KafkaSink createObjectUnderTest() {
return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, evaluator, sinkContext);
return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, evaluator, sinkContext, awsCredentialsSupplier);
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
Expand Down Expand Up @@ -85,6 +86,9 @@ public class KafkaSinkJsonTypeIT {
@Mock
private ExpressionEvaluator evaluator;

@Mock
private AwsCredentialsSupplier awsCredentialsSupplier;

private PlainTextAuthConfig plainTextAuthConfig;
private AuthConfig.SaslAuthConfig saslAuthConfig;
private AuthConfig authConfig;
Expand All @@ -93,7 +97,7 @@ public class KafkaSinkJsonTypeIT {


public KafkaSink createObjectUnderTest() {
return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, evaluator, sinkContext);
return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, evaluator, sinkContext, awsCredentialsSupplier);
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
Expand Down Expand Up @@ -83,6 +84,9 @@ public class KafkaSinkPlainTextTypeIT {
@Mock
private ExpressionEvaluator evaluator;

@Mock
private AwsCredentialsSupplier awsCredentialsSupplier;

private PlainTextAuthConfig plainTextAuthConfig;
private AuthConfig.SaslAuthConfig saslAuthConfig;
private AuthConfig authConfig;
Expand All @@ -91,7 +95,7 @@ public class KafkaSinkPlainTextTypeIT {


public KafkaSink createObjectUnderTest() {
return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, evaluator, sinkContext);
return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, evaluator, sinkContext, awsCredentialsSupplier);
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.dataprepper.plugins.kafka.buffer;

import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
Expand Down Expand Up @@ -42,12 +43,13 @@ public class KafkaBuffer<T extends Record<?>> extends AbstractBuffer<T> {

@DataPrepperPluginConstructor
public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig kafkaBufferConfig, final PluginFactory pluginFactory,
final AcknowledgementSetManager acknowledgementSetManager, final PluginMetrics pluginMetrics){
final AcknowledgementSetManager acknowledgementSetManager, final PluginMetrics pluginMetrics,
final AwsCredentialsSupplier awsCredentialsSupplier) {
super(pluginSetting);
SerializationFactory serializationFactory = new SerializationFactory();
final KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory);
final KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier);
producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, pluginFactory, pluginSetting, null, null);
final KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory(serializationFactory);
final KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory(serializationFactory, awsCredentialsSupplier);
innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName());
final List<KafkaCustomConsumer> consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
innerBuffer, pluginMetrics, acknowledgementSetManager, new AtomicBoolean(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.util.function.Supplier;

/**
* An interface representing important data for how the data going to or coming from
* Kafka should be represented.
*/
public interface KafkaDataConfig {
MessageFormat getSerdeFormat();
Supplier<byte[]> getEncryptionKeySupplier();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.opensearch.dataprepper.plugins.kafka.common;

import org.opensearch.dataprepper.plugins.kafka.common.key.KeyFactory;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.util.function.Supplier;

/**
* Adapts a {@link TopicConfig} to a {@link KafkaDataConfig}.
*/
public class KafkaDataConfigAdapter implements KafkaDataConfig {
private final KeyFactory keyFactory;
private final TopicConfig topicConfig;

public KafkaDataConfigAdapter(KeyFactory keyFactory, TopicConfig topicConfig) {
this.keyFactory = keyFactory;
this.topicConfig = topicConfig;
}

@Override
public MessageFormat getSerdeFormat() {
return topicConfig.getSerdeFormat();
}

@Override
public Supplier<byte[]> getEncryptionKeySupplier() {
if(topicConfig.getEncryptionKey() == null)
return null;
return keyFactory.getKeySupplier(topicConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@

import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.util.function.Supplier;

public class PlaintextKafkaDataConfig implements KafkaDataConfig {
private final KafkaDataConfig dataConfig;

private PlaintextKafkaDataConfig(KafkaDataConfig dataConfig) {
this.dataConfig = dataConfig;
}

/**
* Gets similar {@link KafkaDataConfig} as the given one, but uses {@link MessageFormat#PLAINTEXT} for
* the serialization/deserialization format.
Expand All @@ -11,11 +19,16 @@ public class PlaintextKafkaDataConfig implements KafkaDataConfig {
* @return A {@link KafkaDataConfig} with the PLAINTEXT message format.
*/
public static KafkaDataConfig plaintextDataConfig(final KafkaDataConfig dataConfig) {
return new PlaintextKafkaDataConfig();
return new PlaintextKafkaDataConfig(dataConfig);
}

@Override
public MessageFormat getSerdeFormat() {
return MessageFormat.PLAINTEXT;
}

@Override
public Supplier<byte[]> getEncryptionKeySupplier() {
return dataConfig.getEncryptionKeySupplier();
}
}
Loading

0 comments on commit 70b5765

Please sign in to comment.