diff --git a/CHANGELOG.md b/CHANGELOG.md index a4195c8..7a3a866 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [1.0.1] - 2024-10-24 +### Added +* Support for consumer properties allowing connecting to Kafka cloud provider. + ## [1.0.0] - 2023-04-27 ### Changed * Upgrade `Springboot` version from `2.3.3.RELEASE` to `2.7.10`. diff --git a/README.md b/README.md index deb6cde..61b68c6 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,26 @@ The table below describes all the available configuration values for Drone Fly. | instance.name | Instance name for a Drone Fly instance. `instance.name` is also used to derive the Kafka consumer group. Therefore, in a multi-instance deployment, a unique `instance.name` for each Drone Fly instance needs to be provided to avoid all instances ending up in the same Kafka consumer group. | `string` | `drone-fly` | no | | endpoint.port | Port on which Drone Fly Spring Boot app will start. | `string` | `8008` | no | +### Additional configuration parameters +The Kafka message reader supports properties that are passed to the Kafka consumer builder. +These are environment variables with the PREFIX apiary.messaging.consumer. + + #### Example for sending consumer parameters when using a Kafka cloud provider +- apiary.messaging.consumer.security.protocol=SSL +- apiary.messaging.consumer.sasl.mechanism=AWS_MSK_IAM +- apiary.messaging.consumer.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; +- apiary.messaging.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler + +In this case we are sending the properties to Kafka's consumer to be able to connect to AWS MSK which also requires the IAM library included as a dependency in the POM.xml file + + java -Dloader.path=lib/ -jar drone-fly-app--exec.jar \ + --apiary.bootstrap.servers=localhost:9092 \ + --apiary.kafka.topic.name=apiary \ + --apiary.listener.list="com.expediagroup.sampleListener1,com.expediagroup.sampleListener2" \ + --apiary.messaging.consumer.security.protocol=SSL \ + --apiary.messaging.consumer.sasl.mechanism=AWS_MSK_IAM \ + --apiary.messaging.consumer.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; \ + --apiary.messaging.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler ## Metrics diff --git a/drone-fly-app/pom.xml b/drone-fly-app/pom.xml index e3792a5..0c4edf0 100644 --- a/drone-fly-app/pom.xml +++ b/drone-fly-app/pom.xml @@ -108,6 +108,11 @@ awaitility test + + software.amazon.msk + aws-msk-iam-auth + 1.1.1 + diff --git a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java index 55607e2..9a35844 100644 --- a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java +++ b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/context/CommonBeans.java @@ -16,6 +16,7 @@ package com.expediagroup.dataplatform.dronefly.app.context; import java.util.List; +import java.util.Properties; import java.util.stream.Collectors; import org.apache.hadoop.hive.conf.HiveConf; @@ -24,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -32,10 +34,12 @@ import com.expediagroup.dataplatform.dronefly.app.messaging.MessageReaderAdapter; import com.expediagroup.dataplatform.dronefly.app.service.ListenerCatalog; import com.expediagroup.dataplatform.dronefly.app.service.factory.ListenerCatalogFactory; +import org.springframework.context.annotation.Primary; @Configuration public class CommonBeans { private static final Logger log = LoggerFactory.getLogger(CommonBeans.class); + public static final String CONSUMER_PROPERTIES_PREFIX = "apiary.messaging.consumer"; @Value("${instance.name:drone-fly}") private String instanceName; @@ -54,6 +58,13 @@ public HiveConf hiveConf() { return new HiveConf(); } + @Bean + @Primary + @ConfigurationProperties(CONSUMER_PROPERTIES_PREFIX) + public Properties getEnvProperties() { + return new Properties(); + } + @Bean public ListenerCatalog listenerCatalog(HiveConf conf) throws MetaException { ListenerCatalog listenerCatalog = new ListenerCatalogFactory(conf).newInstance(confListenerList); @@ -65,8 +76,21 @@ public ListenerCatalog listenerCatalog(HiveConf conf) throws MetaException { @Bean public MessageReaderAdapter messageReaderAdapter() { - KafkaMessageReader delegate = KafkaMessageReaderBuilder.builder(bootstrapServers, topicName, instanceName).build(); + Properties consumerProperties = getConsumerProperties(); + KafkaMessageReader delegate = KafkaMessageReaderBuilder. + builder(bootstrapServers, topicName, instanceName). + withConsumerProperties(consumerProperties). + build(); return new MessageReaderAdapter(delegate); } -} + private Properties getConsumerProperties() { + Properties consumerProperties = new Properties(); + getEnvProperties().forEach((key, value) -> { + consumerProperties.put(key.toString(), value.toString()); + log.info("Consumer property {} set with value: {}", key, value); + } ); + return consumerProperties; + } + +} \ No newline at end of file diff --git a/drone-fly-integration-tests/pom.xml b/drone-fly-integration-tests/pom.xml index 28ccf23..b02d87d 100644 --- a/drone-fly-integration-tests/pom.xml +++ b/drone-fly-integration-tests/pom.xml @@ -76,5 +76,10 @@ junit-jupiter-params test + + software.amazon.msk + aws-msk-iam-auth + 1.1.1 +