diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index b0c752c200..a388d3ee6e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -117,7 +117,7 @@ public KafkaSource(final KafkaSourceConfig sourceConfig, @Override public void start(Buffer> buffer) { Properties authProperties = new Properties(); - glueDeserializer = KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); + KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); sourceConfig.getTopics().forEach(topic -> { consumerGroupID = topic.getGroupId(); KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics); @@ -136,7 +136,8 @@ public void start(Buffer> buffer) { break; case PLAINTEXT: default: - if (sourceConfig.getSchemaConfig().getType() == SchemaRegistryType.AWS_GLUE) { + glueDeserializer = KafkaSourceSecurityConfigurer.getGlueSerializer(sourceConfig); + if (Objects.nonNull(glueDeserializer)) { kafkaConsumer = new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer); } else { kafkaConsumer = new KafkaConsumer(consumerProperties); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java index e9e612266f..77fcd6e2fc 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.AwsIamAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; @@ -83,6 +84,7 @@ public class KafkaSourceSecurityConfigurer { private static final int MAX_KAFKA_CLIENT_RETRIES = 360; // for one hour every 10 seconds private static AwsCredentialsProvider credentialsProvider; + private static GlueSchemaRegistryKafkaDeserializer glueDeserializer; /*public static void setSaslPlainTextProperties(final KafkaSourceConfig kafkaSourConfig, @@ -245,12 +247,11 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth } } - public static GlueSchemaRegistryKafkaDeserializer setAuthProperties(Properties properties, final KafkaSourceConfig sourceConfig, final Logger LOG) { + public static void setAuthProperties(Properties properties, final KafkaSourceConfig sourceConfig, final Logger LOG) { final AwsConfig awsConfig = sourceConfig.getAwsConfig(); final AuthConfig authConfig = sourceConfig.getAuthConfig(); final KafkaSourceConfig.EncryptionConfig encryptionConfig = sourceConfig.getEncryptionConfig(); final EncryptionType encryptionType = encryptionConfig.getType(); - GlueSchemaRegistryKafkaDeserializer glueDeserializer = null; credentialsProvider = DefaultCredentialsProvider.create(); @@ -283,15 +284,6 @@ public static GlueSchemaRegistryKafkaDeserializer setAuthProperties(Properties p properties.put("ssl.engine.factory.class", InsecureSslEngineFactory.class); } } - if (sourceConfig.getSchemaConfig().getType() == SchemaRegistryType.AWS_GLUE) { - Map configs = new HashMap(); - configs.put(AWSSchemaRegistryConstants.AWS_REGION, sourceConfig.getAwsConfig().getRegion()); - configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); - configs.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); - configs.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); - configs.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); - glueDeserializer = new GlueSchemaRegistryKafkaDeserializer(credentialsProvider, configs); - } if (Objects.isNull(authConfig) || Objects.isNull(authConfig.getSaslAuthConfig())) { if (encryptionType == EncryptionType.SSL) { properties.put(SECURITY_PROTOCOL, "SSL"); @@ -301,7 +293,22 @@ public static GlueSchemaRegistryKafkaDeserializer setAuthProperties(Properties p throw new RuntimeException("Bootstrap servers are not specified"); } properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + + public static GlueSchemaRegistryKafkaDeserializer getGlueSerializer(final KafkaSourceConfig sourceConfig) { + SchemaConfig schemaConfig = sourceConfig.getSchemaConfig(); + if (Objects.isNull(schemaConfig) || schemaConfig.getType() != SchemaRegistryType.AWS_GLUE) { + return null; + } + Map configs = new HashMap(); + configs.put(AWSSchemaRegistryConstants.AWS_REGION, sourceConfig.getAwsConfig().getRegion()); + configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); + configs.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); + configs.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); + configs.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); + glueDeserializer = new GlueSchemaRegistryKafkaDeserializer(credentialsProvider, configs); return glueDeserializer; } + } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java index be868b3e6f..05843ed1a9 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java @@ -13,6 +13,8 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; +import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.junit.jupiter.api.Assertions; @@ -25,8 +27,10 @@ import org.mockito.quality.Strictness; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; +import java.util.Objects; import java.time.Duration; @ExtendWith(MockitoExtension.class) @@ -37,6 +41,9 @@ class KafkaSourceTest { @Mock private KafkaSourceConfig sourceConfig; + @Mock + private KafkaSourceConfig.EncryptionConfig encryptionConfig; + @Mock private PluginMetrics pluginMetrics; @@ -64,6 +71,7 @@ public KafkaSource createObjectUnderTest() { @BeforeEach void setUp() throws Exception { sourceConfig = mock(KafkaSourceConfig.class); + encryptionConfig = mock(KafkaSourceConfig.EncryptionConfig.class); pipelineDescription = mock(PipelineDescription.class); pluginMetrics = mock(PluginMetrics.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); @@ -79,12 +87,21 @@ void setUp() throws Exception { when(topic2.getConsumerMaxPollRecords()).thenReturn(1); when(topic1.getGroupId()).thenReturn(TEST_GROUP_ID); when(topic2.getGroupId()).thenReturn(TEST_GROUP_ID); + when(topic1.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); + when(topic2.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); + when(topic1.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(5)); + when(topic2.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(5)); when(topic1.getAutoCommit()).thenReturn(false); + when(topic1.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT); + when(topic2.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT); when(topic2.getAutoCommit()).thenReturn(false); when(topic1.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(10)); when(topic2.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(10)); when(sourceConfig.getBootStrapServers()).thenReturn("http://localhost:1234"); when(sourceConfig.getTopics()).thenReturn(Arrays.asList(topic1, topic2)); + when(sourceConfig.getSchemaConfig()).thenReturn(null); + when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig); + when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE); } /* @Test @@ -108,4 +125,14 @@ void test_kafkaSource_start_execution_exception() { kafkaSource = createObjectUnderTest(); Assertions.assertThrows(Exception.class, () -> kafkaSource.start(buffer)); } + + @Test + void test_kafkaSource_basicFunctionality() { + when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + when(topic2.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + kafkaSource = createObjectUnderTest(); + assertTrue(Objects.nonNull(kafkaSource)); + kafkaSource.start(buffer); + assertTrue(Objects.nonNull(kafkaSource.getConsumer())); + } }