Skip to content

Commit

Permalink
OP-23081: Added kafka configs for camel (#502)
Browse files Browse the repository at this point in the history
* OP-23081: Added kafka configs for camel

* OP-23081: Added kafka configs for camel

---------

Co-authored-by: Utkarsh Shukla <[email protected]>
  • Loading branch information
utkarsh-opsmx and Utkarsh Shukla authored Jan 23, 2025
1 parent 667d2e4 commit 190b2dd
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 0 deletions.
1 change: 1 addition & 0 deletions gate-web/gate-web.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ dependencies {
implementation ('org.apache.camel:camel-rabbitmq:3.14.1') {
exclude group: 'com.rabbitmq', module: 'amqp-client'
}
implementation('org.apache.camel:camel-kafka:3.14.1')
implementation ("com.rabbitmq:amqp-client:5.18.0") {
force(true)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.netflix.spinnaker.gate.config;

import static com.opsmx.spinnaker.gate.constant.CamelEndpointConstant.directUserActivity;

import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
@ConditionalOnExpression("${message-broker.enabled:true}")
@ConditionalOnProperty(value = "message-broker.endpoint.name", havingValue = "kafka")
public class KafkaConfig implements CamelRouteConfig {

@Autowired private KafkaProperties kafkaProperties;

private static final String groupId = "oes-sapor-consumer-group";

@Override
public String getUserActivityQueueEndPoint() {
return "kafka:"
+ directUserActivity
+ "?brokers="
+ kafkaProperties.getBootstrapAddress()
+ "&groupid="
+ groupId
+ getSecurityString();
}

private String getSecurityString() {
String securityString = "";
if (kafkaProperties.getSecurity() != null
&& Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_PLAINTEXT")
&& Objects.equals(kafkaProperties.getSasl().getMechanism(), "SCRAM-SHA-256")) {
securityString =
"&securityProtocol="
+ kafkaProperties.getSecurity().getProtocol()
+ "&saslMechanism="
+ kafkaProperties.getSasl().getMechanism()
+ "&saslJaasConfig=org.apache.kafka.common.security.scram.ScramLoginModule required username=\""
+ kafkaProperties.getSasl().getUsername()
+ "\" password=\""
+ kafkaProperties.getSasl().getPassword()
+ "\";";
}

if (kafkaProperties.getSecurity() != null
&& Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_PLAINTEXT")
&& Objects.equals(kafkaProperties.getSasl().getMechanism(), "PLAIN")) {
securityString =
"&securityProtocol="
+ kafkaProperties.getSecurity().getProtocol()
+ "&saslMechanism="
+ kafkaProperties.getSasl().getMechanism()
+ "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\""
+ kafkaProperties.getSasl().getUsername()
+ "\\\" password=\\\""
+ kafkaProperties.getSasl().getPassword()
+ "\\\";";
}

if (kafkaProperties.getSecurity() != null
&& Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_SSL")
&& Objects.equals(kafkaProperties.getSasl().getMechanism(), "AWS_MSK_IAM")) {
securityString =
"&securityProtocol="
+ kafkaProperties.getSecurity().getProtocol()
+ "&saslMechanism="
+ kafkaProperties.getSasl().getMechanism()
+ "&saslJaasConfig=software.amazon.msk.auth.iam.IAMLoginModule required;"
+ "&saslClientCallbackHandlerClass=software.amazon.msk.auth.iam.IAMClientCallbackHandler";
}

if (kafkaProperties.getSecurity() != null
&& Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_SSL")
&& Objects.equals(kafkaProperties.getSasl().getMechanism(), "OAUTHBEARER")) {
securityString =
"&securityProtocol="
+ kafkaProperties.getSecurity().getProtocol()
+ "&saslMechanism="
+ kafkaProperties.getSasl().getMechanism()
+ "&saslJaasConfig=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
+ "&saslClientCallbackHandlerClass=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler"
+ "&saslClientCallbackHandlerClass=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler";
}
return securityString;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.netflix.spinnaker.gate.config;

import lombok.Data;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Data
@Configuration
@ConfigurationProperties(prefix = "message-broker")
@ConditionalOnExpression("${message-broker.enabled:true}")
public class KafkaProperties {

private String bootstrapAddress;
private Security security;
private Sasl sasl;
private Ssl ssl;

@Data
@Configuration
@ConditionalOnExpression("${message-broker.enabled:true}")
@ConfigurationProperties(prefix = "message-broker.security")
public static class Security {
private String protocol;
}

@Data
@Configuration
@ConditionalOnExpression("${message-broker.enabled:true}")
@ConfigurationProperties(prefix = "message-broker.sasl")
public static class Sasl {
private String mechanism;
private String username;
private String password;
}

@Data
@Configuration
@ConditionalOnExpression("${message-broker.enabled:true}")
@ConfigurationProperties(prefix = "message-broker.ssl")
public static class Ssl {
private String keystoreLocation;
private String keystorePassword;
private String truststoreLocation;
private String truststorePassword;
private String keyPassword;
}
}

0 comments on commit 190b2dd

Please sign in to comment.