Skip to content

Commit

Permalink
Fix Null pointer exception when schema registry not specified (#3147)
Browse files Browse the repository at this point in the history
* Fix Null pointer exception when schema registry not specified

Signed-off-by: Krishna Kondaka <[email protected]>

* Fix failing test cases

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Aug 12, 2023
1 parent a5c4fe2 commit 28fdf90
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public KafkaSource(final KafkaSourceConfig sourceConfig,
@Override
public void start(Buffer<Record<Event>> buffer) {
Properties authProperties = new Properties();
glueDeserializer = KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG);
KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG);
sourceConfig.getTopics().forEach(topic -> {
consumerGroupID = topic.getGroupId();
KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics);
Expand All @@ -136,7 +136,8 @@ public void start(Buffer<Record<Event>> buffer) {
break;
case PLAINTEXT:
default:
if (sourceConfig.getSchemaConfig().getType() == SchemaRegistryType.AWS_GLUE) {
glueDeserializer = KafkaSourceSecurityConfigurer.getGlueSerializer(sourceConfig);
if (Objects.nonNull(glueDeserializer)) {
kafkaConsumer = new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer);
} else {
kafkaConsumer = new KafkaConsumer<String, String>(consumerProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.AwsIamAuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType;
import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig;
Expand Down Expand Up @@ -83,6 +84,7 @@ public class KafkaSourceSecurityConfigurer {
private static final int MAX_KAFKA_CLIENT_RETRIES = 360; // for one hour every 10 seconds

private static AwsCredentialsProvider credentialsProvider;
private static GlueSchemaRegistryKafkaDeserializer glueDeserializer;


/*public static void setSaslPlainTextProperties(final KafkaSourceConfig kafkaSourConfig,
Expand Down Expand Up @@ -245,12 +247,11 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth
}
}

public static GlueSchemaRegistryKafkaDeserializer setAuthProperties(Properties properties, final KafkaSourceConfig sourceConfig, final Logger LOG) {
public static void setAuthProperties(Properties properties, final KafkaSourceConfig sourceConfig, final Logger LOG) {
final AwsConfig awsConfig = sourceConfig.getAwsConfig();
final AuthConfig authConfig = sourceConfig.getAuthConfig();
final KafkaSourceConfig.EncryptionConfig encryptionConfig = sourceConfig.getEncryptionConfig();
final EncryptionType encryptionType = encryptionConfig.getType();
GlueSchemaRegistryKafkaDeserializer glueDeserializer = null;

credentialsProvider = DefaultCredentialsProvider.create();

Expand Down Expand Up @@ -283,15 +284,6 @@ public static GlueSchemaRegistryKafkaDeserializer setAuthProperties(Properties p
properties.put("ssl.engine.factory.class", InsecureSslEngineFactory.class);
}
}
if (sourceConfig.getSchemaConfig().getType() == SchemaRegistryType.AWS_GLUE) {
Map<String, Object> configs = new HashMap();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, sourceConfig.getAwsConfig().getRegion());
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
configs.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000");
configs.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10");
configs.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL);
glueDeserializer = new GlueSchemaRegistryKafkaDeserializer(credentialsProvider, configs);
}
if (Objects.isNull(authConfig) || Objects.isNull(authConfig.getSaslAuthConfig())) {
if (encryptionType == EncryptionType.SSL) {
properties.put(SECURITY_PROTOCOL, "SSL");
Expand All @@ -301,7 +293,22 @@ public static GlueSchemaRegistryKafkaDeserializer setAuthProperties(Properties p
throw new RuntimeException("Bootstrap servers are not specified");
}
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
}

public static GlueSchemaRegistryKafkaDeserializer getGlueSerializer(final KafkaSourceConfig sourceConfig) {
SchemaConfig schemaConfig = sourceConfig.getSchemaConfig();
if (Objects.isNull(schemaConfig) || schemaConfig.getType() != SchemaRegistryType.AWS_GLUE) {
return null;
}
Map<String, Object> configs = new HashMap();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, sourceConfig.getAwsConfig().getRegion());
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
configs.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000");
configs.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10");
configs.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL);
glueDeserializer = new GlueSchemaRegistryKafkaDeserializer(credentialsProvider, configs);
return glueDeserializer;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;

import org.junit.jupiter.api.Assertions;
Expand All @@ -25,8 +27,10 @@
import org.mockito.quality.Strictness;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Arrays;
import java.util.Objects;
import java.time.Duration;

@ExtendWith(MockitoExtension.class)
Expand All @@ -37,6 +41,9 @@ class KafkaSourceTest {
@Mock
private KafkaSourceConfig sourceConfig;

@Mock
private KafkaSourceConfig.EncryptionConfig encryptionConfig;

@Mock
private PluginMetrics pluginMetrics;

Expand Down Expand Up @@ -64,6 +71,7 @@ public KafkaSource createObjectUnderTest() {
@BeforeEach
void setUp() throws Exception {
sourceConfig = mock(KafkaSourceConfig.class);
encryptionConfig = mock(KafkaSourceConfig.EncryptionConfig.class);
pipelineDescription = mock(PipelineDescription.class);
pluginMetrics = mock(PluginMetrics.class);
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
Expand All @@ -79,12 +87,21 @@ void setUp() throws Exception {
when(topic2.getConsumerMaxPollRecords()).thenReturn(1);
when(topic1.getGroupId()).thenReturn(TEST_GROUP_ID);
when(topic2.getGroupId()).thenReturn(TEST_GROUP_ID);
when(topic1.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5));
when(topic2.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5));
when(topic1.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(5));
when(topic2.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(5));
when(topic1.getAutoCommit()).thenReturn(false);
when(topic1.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT);
when(topic2.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT);
when(topic2.getAutoCommit()).thenReturn(false);
when(topic1.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(10));
when(topic2.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(10));
when(sourceConfig.getBootStrapServers()).thenReturn("http://localhost:1234");
when(sourceConfig.getTopics()).thenReturn(Arrays.asList(topic1, topic2));
when(sourceConfig.getSchemaConfig()).thenReturn(null);
when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig);
when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE);
}

/* @Test
Expand All @@ -108,4 +125,14 @@ void test_kafkaSource_start_execution_exception() {
kafkaSource = createObjectUnderTest();
Assertions.assertThrows(Exception.class, () -> kafkaSource.start(buffer));
}

@Test
void test_kafkaSource_basicFunctionality() {
when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));
when(topic2.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));
kafkaSource = createObjectUnderTest();
assertTrue(Objects.nonNull(kafkaSource));
kafkaSource.start(buffer);
assertTrue(Objects.nonNull(kafkaSource.getConsumer()));
}
}

0 comments on commit 28fdf90

Please sign in to comment.