Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OP-23081: Added kafka configs for camel #502

Merged
merged 2 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gate-web/gate-web.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ dependencies {
implementation ('org.apache.camel:camel-rabbitmq:3.14.1') {
exclude group: 'com.rabbitmq', module: 'amqp-client'
}
implementation('org.apache.camel:camel-kafka:3.14.1')
implementation ("com.rabbitmq:amqp-client:5.18.0") {
force(true)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.netflix.spinnaker.gate.config;

import static com.opsmx.spinnaker.gate.constant.CamelEndpointConstant.directUserActivity;

import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
@ConditionalOnExpression("${message-broker.enabled:true}")
@ConditionalOnProperty(value = "message-broker.endpoint.name", havingValue = "kafka")
public class KafkaConfig implements CamelRouteConfig {

@Autowired private KafkaProperties kafkaProperties;

private static final String groupId = "oes-sapor-consumer-group";

@Override
public String getUserActivityQueueEndPoint() {
return "kafka:"
+ directUserActivity
+ "?brokers="
+ kafkaProperties.getBootstrapAddress()
+ "&groupid="
+ groupId
+ getSecurityString();
}

private String getSecurityString() {
String securityString = "";
if (kafkaProperties.getSecurity() != null
&& Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_PLAINTEXT")
&& Objects.equals(kafkaProperties.getSasl().getMechanism(), "SCRAM-SHA-256")) {
securityString =
"&securityProtocol="
+ kafkaProperties.getSecurity().getProtocol()
+ "&saslMechanism="
+ kafkaProperties.getSasl().getMechanism()
+ "&saslJaasConfig=org.apache.kafka.common.security.scram.ScramLoginModule required username=\""
+ kafkaProperties.getSasl().getUsername()
+ "\" password=\""
+ kafkaProperties.getSasl().getPassword()
+ "\";";
}

if (kafkaProperties.getSecurity() != null
&& Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_PLAINTEXT")
&& Objects.equals(kafkaProperties.getSasl().getMechanism(), "PLAIN")) {
securityString =
"&securityProtocol="
+ kafkaProperties.getSecurity().getProtocol()
+ "&saslMechanism="
+ kafkaProperties.getSasl().getMechanism()
+ "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\""
+ kafkaProperties.getSasl().getUsername()
+ "\\\" password=\\\""
+ kafkaProperties.getSasl().getPassword()
+ "\\\";";
}

if (kafkaProperties.getSecurity() != null
&& Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_SSL")
&& Objects.equals(kafkaProperties.getSasl().getMechanism(), "AWS_MSK_IAM")) {
securityString =
"&securityProtocol="
+ kafkaProperties.getSecurity().getProtocol()
+ "&saslMechanism="
+ kafkaProperties.getSasl().getMechanism()
+ "&saslJaasConfig=software.amazon.msk.auth.iam.IAMLoginModule required;"
+ "&saslClientCallbackHandlerClass=software.amazon.msk.auth.iam.IAMClientCallbackHandler";
}

if (kafkaProperties.getSecurity() != null
&& Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_SSL")
&& Objects.equals(kafkaProperties.getSasl().getMechanism(), "OAUTHBEARER")) {
securityString =
"&securityProtocol="
+ kafkaProperties.getSecurity().getProtocol()
+ "&saslMechanism="
+ kafkaProperties.getSasl().getMechanism()
+ "&saslJaasConfig=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
+ "&saslClientCallbackHandlerClass=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler"
+ "&saslClientCallbackHandlerClass=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler";
}
return securityString;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.netflix.spinnaker.gate.config;

import lombok.Data;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Data
@Configuration
@ConfigurationProperties(prefix = "message-broker")
@ConditionalOnExpression("${message-broker.enabled:true}")
public class KafkaProperties {

private String bootstrapAddress;
private Security security;
private Sasl sasl;
private Ssl ssl;

@Data
@Configuration
@ConditionalOnExpression("${message-broker.enabled:true}")
@ConfigurationProperties(prefix = "message-broker.security")
public static class Security {
private String protocol;
}

@Data
@Configuration
@ConditionalOnExpression("${message-broker.enabled:true}")
@ConfigurationProperties(prefix = "message-broker.sasl")
public static class Sasl {
private String mechanism;
private String username;
private String password;
}

@Data
@Configuration
@ConditionalOnExpression("${message-broker.enabled:true}")
@ConfigurationProperties(prefix = "message-broker.ssl")
public static class Ssl {
private String keystoreLocation;
private String keystorePassword;
private String truststoreLocation;
private String truststorePassword;
private String keyPassword;
}
}
Loading