Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding config properties map for kafka consumer #33

Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [1.0.1] - 2024-10-24
### Added
* Support for consumer properties allowing connecting to Kafka cloud provider.

## [1.0.0] - 2023-04-27
### Changed
* Upgrade `Springboot` version from `2.3.3.RELEASE` to `2.7.10`.
Expand Down
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,26 @@ The table below describes all the available configuration values for Drone Fly.
| instance.name | Instance name for a Drone Fly instance. `instance.name` is also used to derive the Kafka consumer group. Therefore, in a multi-instance deployment, a unique `instance.name` for each Drone Fly instance needs to be provided to avoid all instances ending up in the same Kafka consumer group. | `string` | `drone-fly` | no |
| endpoint.port | Port on which Drone Fly Spring Boot app will start. | `string` | `8008` | no |

### Additional configuration parameters
The Kafka message reader supports properties that are passed to the Kafka consumer builder.
These are environment variables with the PREFIX apiary.messaging.consumer.

#### Example for sending consumer parameters when using a Kafka cloud provider
- apiary.messaging.consumer.security.protocol=SSL
- apiary.messaging.consumer.sasl.mechanism=AWS_MSK_IAM
- apiary.messaging.consumer.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
- apiary.messaging.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

In this case we are sending the properties to Kafka's consumer to be able to connect to AWS MSK which also requires the IAM library included as a dependency in the POM.xml file

java -Dloader.path=lib/ -jar drone-fly-app-<version>-exec.jar \
--apiary.bootstrap.servers=localhost:9092 \
--apiary.kafka.topic.name=apiary \
--apiary.listener.list="com.expediagroup.sampleListener1,com.expediagroup.sampleListener2" \
--apiary.messaging.consumer.security.protocol=SSL \
--apiary.messaging.consumer.sasl.mechanism=AWS_MSK_IAM \
--apiary.messaging.consumer.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; \
--apiary.messaging.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

## Metrics

Expand Down
5 changes: 5 additions & 0 deletions drone-fly-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.expediagroup.dataplatform.dronefly.app.context;

import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -24,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand All @@ -32,10 +34,12 @@
import com.expediagroup.dataplatform.dronefly.app.messaging.MessageReaderAdapter;
import com.expediagroup.dataplatform.dronefly.app.service.ListenerCatalog;
import com.expediagroup.dataplatform.dronefly.app.service.factory.ListenerCatalogFactory;
import org.springframework.context.annotation.Primary;

@Configuration
public class CommonBeans {
private static final Logger log = LoggerFactory.getLogger(CommonBeans.class);
public static final String CONSUMER_PROPERTIES_PREFIX = "apiary.messaging.consumer";

@Value("${instance.name:drone-fly}")
private String instanceName;
Expand All @@ -54,6 +58,13 @@ public HiveConf hiveConf() {
return new HiveConf();
}

@Bean
@Primary
hlegarda marked this conversation as resolved.
Show resolved Hide resolved
@ConfigurationProperties(CONSUMER_PROPERTIES_PREFIX)
public Properties getEnvProperties() {
return new Properties();
}

@Bean
public ListenerCatalog listenerCatalog(HiveConf conf) throws MetaException {
ListenerCatalog listenerCatalog = new ListenerCatalogFactory(conf).newInstance(confListenerList);
Expand All @@ -65,8 +76,21 @@ public ListenerCatalog listenerCatalog(HiveConf conf) throws MetaException {

@Bean
public MessageReaderAdapter messageReaderAdapter() {
KafkaMessageReader delegate = KafkaMessageReaderBuilder.builder(bootstrapServers, topicName, instanceName).build();
Properties consumerProperties = getConsumerProperties();
KafkaMessageReader delegate = KafkaMessageReaderBuilder.
builder(bootstrapServers, topicName, instanceName).
withConsumerProperties(consumerProperties).
build();
return new MessageReaderAdapter(delegate);
}

}
private Properties getConsumerProperties() {
Properties consumerProperties = new Properties();
getEnvProperties().forEach((key, value) -> {
consumerProperties.put(key.toString(), value.toString());
hlegarda marked this conversation as resolved.
Show resolved Hide resolved
log.info("Consumer property {} set with value: {}", key, value);
} );
return consumerProperties;
}

}
5 changes: 5 additions & 0 deletions drone-fly-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,10 @@
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
</project>
Loading