From 24ac0c5ad7748f26793c004bb30cf10767b6a6cd Mon Sep 17 00:00:00 2001 From: Utkarsh Shukla Date: Tue, 21 Jan 2025 08:37:28 +0530 Subject: [PATCH 1/2] OP-23081: Added kafka configs for camel --- gate-web/gate-web.gradle | 1 + .../spinnaker/gate/config/KafkaConfig.java | 94 +++++++++++++++++++ .../gate/config/KafkaProperties.java | 48 ++++++++++ 3 files changed, 143 insertions(+) create mode 100644 gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaConfig.java create mode 100644 gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaProperties.java diff --git a/gate-web/gate-web.gradle b/gate-web/gate-web.gradle index d94b0ca8da..1710e30ddb 100644 --- a/gate-web/gate-web.gradle +++ b/gate-web/gate-web.gradle @@ -67,6 +67,7 @@ dependencies { implementation ('org.apache.camel:camel-rabbitmq:3.14.1') { exclude group: 'com.rabbitmq', module: 'amqp-client' } + implementation('org.apache.camel:camel-kafka:3.14.1') implementation ("com.rabbitmq:amqp-client:5.18.0") { force(true) } diff --git a/gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaConfig.java b/gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaConfig.java new file mode 100644 index 0000000000..8ec5114fa2 --- /dev/null +++ b/gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaConfig.java @@ -0,0 +1,94 @@ +package com.netflix.spinnaker.gate.config; + +import static com.opsmx.spinnaker.gate.constant.CamelEndpointConstant.directUserActivity; + +import java.util.Objects; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Configuration; + +@Slf4j +@Configuration +@ConditionalOnExpression("${message-broker.enabled:true}") +@ConditionalOnProperty(value = "message-broker.endpoint.name", havingValue = "kafka") +public class KafkaConfig implements CamelRouteConfig { + + @Autowired private KafkaProperties kafkaProperties; + + private static final String groupId = "oes-sapor-consumer-group"; + + @Override + public String getUserActivityQueueEndPoint() { + return "kafka:" + + directUserActivity + + "?brokers=" + + kafkaProperties.getBootstrapAddress() + + "&groupid=" + + groupId + + getSecurityString(); + } + + private String getSecurityString() { + String securityString = ""; + log.info("Entered kafkaConfig constructor "); + if (kafkaProperties.getSecurity() != null + && Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_PLAINTEXT") + && Objects.equals(kafkaProperties.getSasl().getMechanism(), "SCRAM-SHA-256")) { + log.info("Entered scram sha"); + securityString = + "&securityProtocol=" + + kafkaProperties.getSecurity().getProtocol() + + "&saslMechanism=" + + kafkaProperties.getSasl().getMechanism() + + "&saslJaasConfig=org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + + kafkaProperties.getSasl().getUsername() + + "\" password=\"" + + kafkaProperties.getSasl().getPassword() + + "\";"; + } + + if (kafkaProperties.getSecurity() != null + && Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_PLAINTEXT") + && Objects.equals(kafkaProperties.getSasl().getMechanism(), "PLAIN")) { + log.info("Entered sasl plain"); + securityString = + "&securityProtocol=" + + kafkaProperties.getSecurity().getProtocol() + + "&saslMechanism=" + + kafkaProperties.getSasl().getMechanism() + + "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"" + + kafkaProperties.getSasl().getUsername() + + "\\\" password=\\\"" + + kafkaProperties.getSasl().getPassword() + + "\\\";"; + } + + if (kafkaProperties.getSecurity() != null + && Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_SSL") + && Objects.equals(kafkaProperties.getSasl().getMechanism(), "AWS_MSK_IAM")) { + securityString = + "&securityProtocol=" + + kafkaProperties.getSecurity().getProtocol() + + "&saslMechanism=" + + kafkaProperties.getSasl().getMechanism() + + "&saslJaasConfig=software.amazon.msk.auth.iam.IAMLoginModule required;" + + "&saslClientCallbackHandlerClass=software.amazon.msk.auth.iam.IAMClientCallbackHandler"; + } + + if (kafkaProperties.getSecurity() != null + && Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_SSL") + && Objects.equals(kafkaProperties.getSasl().getMechanism(), "OAUTHBEARER")) { + securityString = + "&securityProtocol=" + + kafkaProperties.getSecurity().getProtocol() + + "&saslMechanism=" + + kafkaProperties.getSasl().getMechanism() + + "&saslJaasConfig=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" + + "&saslClientCallbackHandlerClass=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler" + + "&saslClientCallbackHandlerClass=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler"; + } + return securityString; + } +} diff --git a/gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaProperties.java b/gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaProperties.java new file mode 100644 index 0000000000..029d0b18e5 --- /dev/null +++ b/gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaProperties.java @@ -0,0 +1,48 @@ +package com.netflix.spinnaker.gate.config; + +import lombok.Data; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties(prefix = "message-broker") +@ConditionalOnExpression("${message-broker.enabled:true}") +public class KafkaProperties { + + private String bootstrapAddress; + private Security security; + private Sasl sasl; + private Ssl ssl; + + @Data + @Configuration + @ConditionalOnExpression("${message-broker.enabled:true}") + @ConfigurationProperties(prefix = "message-broker.security") + public static class Security { + private String protocol; + } + + @Data + @Configuration + @ConditionalOnExpression("${message-broker.enabled:true}") + @ConfigurationProperties(prefix = "message-broker.sasl") + public static class Sasl { + private String mechanism; + private String username; + private String password; + } + + @Data + @Configuration + @ConditionalOnExpression("${message-broker.enabled:true}") + @ConfigurationProperties(prefix = "message-broker.ssl") + public static class Ssl { + private String keystoreLocation; + private String keystorePassword; + private String truststoreLocation; + private String truststorePassword; + private String keyPassword; + } +} From d51558fe3ab2a4cf4e037ef9e264aa056c51d874 Mon Sep 17 00:00:00 2001 From: Utkarsh Shukla Date: Tue, 21 Jan 2025 08:45:56 +0530 Subject: [PATCH 2/2] OP-23081: Added kafka configs for camel --- .../java/com/netflix/spinnaker/gate/config/KafkaConfig.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaConfig.java b/gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaConfig.java index 8ec5114fa2..c20cb3d963 100644 --- a/gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaConfig.java +++ b/gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaConfig.java @@ -32,11 +32,9 @@ public String getUserActivityQueueEndPoint() { private String getSecurityString() { String securityString = ""; - log.info("Entered kafkaConfig constructor "); if (kafkaProperties.getSecurity() != null && Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_PLAINTEXT") && Objects.equals(kafkaProperties.getSasl().getMechanism(), "SCRAM-SHA-256")) { - log.info("Entered scram sha"); securityString = "&securityProtocol=" + kafkaProperties.getSecurity().getProtocol() @@ -52,7 +50,6 @@ private String getSecurityString() { if (kafkaProperties.getSecurity() != null && Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_PLAINTEXT") && Objects.equals(kafkaProperties.getSasl().getMechanism(), "PLAIN")) { - log.info("Entered sasl plain"); securityString = "&securityProtocol=" + kafkaProperties.getSecurity().getProtocol()