diff --git a/gate-web/gate-web.gradle b/gate-web/gate-web.gradle index d94b0ca8da..1710e30ddb 100644 --- a/gate-web/gate-web.gradle +++ b/gate-web/gate-web.gradle @@ -67,6 +67,7 @@ dependencies { implementation ('org.apache.camel:camel-rabbitmq:3.14.1') { exclude group: 'com.rabbitmq', module: 'amqp-client' } + implementation('org.apache.camel:camel-kafka:3.14.1') implementation ("com.rabbitmq:amqp-client:5.18.0") { force(true) } diff --git a/gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaConfig.java b/gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaConfig.java new file mode 100644 index 0000000000..c20cb3d963 --- /dev/null +++ b/gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaConfig.java @@ -0,0 +1,91 @@ +package com.netflix.spinnaker.gate.config; + +import static com.opsmx.spinnaker.gate.constant.CamelEndpointConstant.directUserActivity; + +import java.util.Objects; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Configuration; + +@Slf4j +@Configuration +@ConditionalOnExpression("${message-broker.enabled:true}") +@ConditionalOnProperty(value = "message-broker.endpoint.name", havingValue = "kafka") +public class KafkaConfig implements CamelRouteConfig { + + @Autowired private KafkaProperties kafkaProperties; + + private static final String groupId = "oes-sapor-consumer-group"; + + @Override + public String getUserActivityQueueEndPoint() { + return "kafka:" + + directUserActivity + + "?brokers=" + + kafkaProperties.getBootstrapAddress() + + "&groupid=" + + groupId + + getSecurityString(); + } + + private String getSecurityString() { + String securityString = ""; + if (kafkaProperties.getSecurity() != null + && Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_PLAINTEXT") + && Objects.equals(kafkaProperties.getSasl().getMechanism(), "SCRAM-SHA-256")) { + securityString = + "&securityProtocol=" + + kafkaProperties.getSecurity().getProtocol() + + "&saslMechanism=" + + kafkaProperties.getSasl().getMechanism() + + "&saslJaasConfig=org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + + kafkaProperties.getSasl().getUsername() + + "\" password=\"" + + kafkaProperties.getSasl().getPassword() + + "\";"; + } + + if (kafkaProperties.getSecurity() != null + && Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_PLAINTEXT") + && Objects.equals(kafkaProperties.getSasl().getMechanism(), "PLAIN")) { + securityString = + "&securityProtocol=" + + kafkaProperties.getSecurity().getProtocol() + + "&saslMechanism=" + + kafkaProperties.getSasl().getMechanism() + + "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"" + + kafkaProperties.getSasl().getUsername() + + "\\\" password=\\\"" + + kafkaProperties.getSasl().getPassword() + + "\\\";"; + } + + if (kafkaProperties.getSecurity() != null + && Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_SSL") + && Objects.equals(kafkaProperties.getSasl().getMechanism(), "AWS_MSK_IAM")) { + securityString = + "&securityProtocol=" + + kafkaProperties.getSecurity().getProtocol() + + "&saslMechanism=" + + kafkaProperties.getSasl().getMechanism() + + "&saslJaasConfig=software.amazon.msk.auth.iam.IAMLoginModule required;" + + "&saslClientCallbackHandlerClass=software.amazon.msk.auth.iam.IAMClientCallbackHandler"; + } + + if (kafkaProperties.getSecurity() != null + && Objects.equals(kafkaProperties.getSecurity().getProtocol(), "SASL_SSL") + && Objects.equals(kafkaProperties.getSasl().getMechanism(), "OAUTHBEARER")) { + securityString = + "&securityProtocol=" + + kafkaProperties.getSecurity().getProtocol() + + "&saslMechanism=" + + kafkaProperties.getSasl().getMechanism() + + "&saslJaasConfig=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" + + "&saslClientCallbackHandlerClass=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler" + + "&saslClientCallbackHandlerClass=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler"; + } + return securityString; + } +} diff --git a/gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaProperties.java b/gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaProperties.java new file mode 100644 index 0000000000..029d0b18e5 --- /dev/null +++ b/gate-web/src/main/java/com/netflix/spinnaker/gate/config/KafkaProperties.java @@ -0,0 +1,48 @@ +package com.netflix.spinnaker.gate.config; + +import lombok.Data; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties(prefix = "message-broker") +@ConditionalOnExpression("${message-broker.enabled:true}") +public class KafkaProperties { + + private String bootstrapAddress; + private Security security; + private Sasl sasl; + private Ssl ssl; + + @Data + @Configuration + @ConditionalOnExpression("${message-broker.enabled:true}") + @ConfigurationProperties(prefix = "message-broker.security") + public static class Security { + private String protocol; + } + + @Data + @Configuration + @ConditionalOnExpression("${message-broker.enabled:true}") + @ConfigurationProperties(prefix = "message-broker.sasl") + public static class Sasl { + private String mechanism; + private String username; + private String password; + } + + @Data + @Configuration + @ConditionalOnExpression("${message-broker.enabled:true}") + @ConfigurationProperties(prefix = "message-broker.ssl") + public static class Ssl { + private String keystoreLocation; + private String keystorePassword; + private String truststoreLocation; + private String truststorePassword; + private String keyPassword; + } +}