From 7c677562d7afaeddad02d4d3adeba9d46fc18024 Mon Sep 17 00:00:00 2001 From: Utkarsh Shukla Date: Tue, 14 Jan 2025 11:35:58 +0530 Subject: [PATCH] OP-23081: Added kafka configs and producers --- gate-web/gate-web.gradle | 1 + .../gate/kafkaConfig/KafkaProducerConfig.java | 81 +++++++++++++++++++ .../gate/kafkaConfig/KafkaProperties.java | 48 +++++++++++ .../gate/kafkaConfig/ProducerTopics.java | 5 ++ .../spinnaker/gate/util/GeneralPublisher.java | 27 +++++++ .../audit/AuthenticationAuditListener.java | 17 +++- .../gate/audit/UserActivityAuditListener.java | 10 ++- 7 files changed, 183 insertions(+), 6 deletions(-) create mode 100644 gate-web/src/main/java/com/netflix/spinnaker/gate/kafkaConfig/KafkaProducerConfig.java create mode 100644 gate-web/src/main/java/com/netflix/spinnaker/gate/kafkaConfig/KafkaProperties.java create mode 100644 gate-web/src/main/java/com/netflix/spinnaker/gate/kafkaConfig/ProducerTopics.java create mode 100644 gate-web/src/main/java/com/netflix/spinnaker/gate/util/GeneralPublisher.java diff --git a/gate-web/gate-web.gradle b/gate-web/gate-web.gradle index d94b0ca8da..41360c935d 100644 --- a/gate-web/gate-web.gradle +++ b/gate-web/gate-web.gradle @@ -71,6 +71,7 @@ dependencies { force(true) } implementation 'org.apache.camel:camel-jackson:3.14.1' + implementation 'org.springframework.kafka:spring-kafka:3.1.2' runtimeOnly "io.spinnaker.kork:kork-runtime" runtimeOnly "org.springframework.boot:spring-boot-properties-migrator" diff --git a/gate-web/src/main/java/com/netflix/spinnaker/gate/kafkaConfig/KafkaProducerConfig.java b/gate-web/src/main/java/com/netflix/spinnaker/gate/kafkaConfig/KafkaProducerConfig.java new file mode 100644 index 0000000000..a474d9dd34 --- /dev/null +++ b/gate-web/src/main/java/com/netflix/spinnaker/gate/kafkaConfig/KafkaProducerConfig.java @@ -0,0 +1,81 @@ +package com.netflix.spinnaker.gate.kafkaConfig; + +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +@Configuration +@ConditionalOnExpression("${kafka-broker.enabled:true}") +public class KafkaProducerConfig { + + @Autowired private KafkaProperties kafkaProperties; + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapAddress()); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + if (kafkaProperties.getSecurity() != null + && StringUtils.isNotBlank(kafkaProperties.getSecurity().getProtocol())) { + configProps.put( + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, + kafkaProperties.getSecurity().getProtocol()); + } + if (kafkaProperties.getSasl() != null + && StringUtils.isNotBlank(kafkaProperties.getSasl().getMechanism())) { + configProps.put(SaslConfigs.SASL_MECHANISM, kafkaProperties.getSasl().getMechanism()); + } + if (kafkaProperties.getSasl() != null + && StringUtils.isNotBlank(kafkaProperties.getSasl().getUsername()) + && StringUtils.isNotBlank(kafkaProperties.getSasl().getPassword())) { + configProps.put( + SaslConfigs.SASL_JAAS_CONFIG, + "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + + kafkaProperties.getSasl().getUsername() + + "\" password=\"" + + kafkaProperties.getSasl().getPassword() + + "\";"); + } + if (kafkaProperties.getSsl() != null + && StringUtils.isNotBlank(kafkaProperties.getSsl().getKeyPassword())) { + configProps.put( + SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaProperties.getSsl().getKeyPassword()); + } + if (kafkaProperties.getSsl() != null + && StringUtils.isNotBlank(kafkaProperties.getSsl().getKeystorePassword())) { + configProps.put( + SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaProperties.getSsl().getKeystorePassword()); + } + if (kafkaProperties.getSsl() != null + && StringUtils.isNotBlank(kafkaProperties.getSsl().getKeystoreLocation())) { + configProps.put( + SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaProperties.getSsl().getKeystoreLocation()); + } + if (kafkaProperties.getSsl() != null + && StringUtils.isNotBlank(kafkaProperties.getSsl().getTruststoreLocation())) { + configProps.put( + SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaProperties.getSsl().getTruststoreLocation()); + } + if (kafkaProperties.getSsl() != null + && StringUtils.isNotBlank(kafkaProperties.getSsl().getTruststorePassword())) { + configProps.put( + SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaProperties.getSsl().getTruststorePassword()); + } + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/gate-web/src/main/java/com/netflix/spinnaker/gate/kafkaConfig/KafkaProperties.java b/gate-web/src/main/java/com/netflix/spinnaker/gate/kafkaConfig/KafkaProperties.java new file mode 100644 index 0000000000..23a4c7afc0 --- /dev/null +++ b/gate-web/src/main/java/com/netflix/spinnaker/gate/kafkaConfig/KafkaProperties.java @@ -0,0 +1,48 @@ +package com.netflix.spinnaker.gate.kafkaConfig; + +import lombok.Data; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties(prefix = "kafka-broker") +@ConditionalOnExpression("${kafka-broker.enabled:true}") +public class KafkaProperties { + + private String bootstrapAddress; + private Security security; + private Sasl sasl; + private Ssl ssl; + + @Data + @Configuration + @ConditionalOnExpression("${kafka-broker.enabled:true}") + @ConfigurationProperties(prefix = "kafka-broker.security") + public static class Security { + private String protocol; + } + + @Data + @Configuration + @ConditionalOnExpression("${kafka-broker.enabled:true}") + @ConfigurationProperties(prefix = "kafka-broker.sasl") + public static class Sasl { + private String mechanism; + private String username; + private String password; + } + + @Data + @Configuration + @ConditionalOnExpression("${kafka-broker.enabled:true}") + @ConfigurationProperties(prefix = "kafka-broker.ssl") + public static class Ssl { + private String keystoreLocation; + private String keystorePassword; + private String truststoreLocation; + private String truststorePassword; + private String keyPassword; + } +} diff --git a/gate-web/src/main/java/com/netflix/spinnaker/gate/kafkaConfig/ProducerTopics.java b/gate-web/src/main/java/com/netflix/spinnaker/gate/kafkaConfig/ProducerTopics.java new file mode 100644 index 0000000000..7dd4434b6d --- /dev/null +++ b/gate-web/src/main/java/com/netflix/spinnaker/gate/kafkaConfig/ProducerTopics.java @@ -0,0 +1,5 @@ +package com.netflix.spinnaker.gate.kafkaConfig; + +public interface ProducerTopics { + String USER_ACTIVITY_TOPIC = "user-activity-topic"; +} diff --git a/gate-web/src/main/java/com/netflix/spinnaker/gate/util/GeneralPublisher.java b/gate-web/src/main/java/com/netflix/spinnaker/gate/util/GeneralPublisher.java new file mode 100644 index 0000000000..040e8fe289 --- /dev/null +++ b/gate-web/src/main/java/com/netflix/spinnaker/gate/util/GeneralPublisher.java @@ -0,0 +1,27 @@ +package com.netflix.spinnaker.gate.util; + +import org.apache.camel.ProducerTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Component +public class GeneralPublisher { + + @Autowired(required = false) + private ProducerTemplate producerTemplate; + + @Autowired(required = false) + private KafkaTemplate kafkaTemplate; + + public void publish(String rabbitMQEndpoint, String kafkaTopic, String payload) { + if (producerTemplate != null) { + producerTemplate.asyncSendBody(rabbitMQEndpoint, payload); + } + if (kafkaTemplate != null) { + kafkaTemplate.send(kafkaTopic, payload); + } + } + + public void publishIsdToSpin(String rabbitMQEndpoint, String kafkaTopic, String payload) {} +} diff --git a/gate-web/src/main/java/com/opsmx/spinnaker/gate/audit/AuthenticationAuditListener.java b/gate-web/src/main/java/com/opsmx/spinnaker/gate/audit/AuthenticationAuditListener.java index 5d7513931f..343b17b79a 100644 --- a/gate-web/src/main/java/com/opsmx/spinnaker/gate/audit/AuthenticationAuditListener.java +++ b/gate-web/src/main/java/com/opsmx/spinnaker/gate/audit/AuthenticationAuditListener.java @@ -17,6 +17,8 @@ package com.opsmx.spinnaker.gate.audit; import com.google.gson.Gson; +import com.netflix.spinnaker.gate.kafkaConfig.ProducerTopics; +import com.netflix.spinnaker.gate.util.GeneralPublisher; import com.opsmx.spinnaker.gate.constant.CamelEndpointConstant; import com.opsmx.spinnaker.gate.enums.AuditEventType; import com.opsmx.spinnaker.gate.model.AuditData; @@ -45,6 +47,8 @@ public class AuthenticationAuditListener extends AbstractAuthenticationAuditList @Autowired(required = false) private AuditHandler auditHandler; + @Autowired private GeneralPublisher generalPublisher; + @Autowired @Lazy private ProducerTemplate template; Gson gson = new Gson(); @@ -66,14 +70,17 @@ public void onApplicationEvent(AbstractAuthenticationEvent event) { if (event.getAuthentication().isAuthenticated() && event instanceof AuthenticationSuccessEvent) { log.debug("publishEvent AuthenticationSuccessEvent"); - template.asyncSendBody( + + generalPublisher.publish( CamelEndpointConstant.directUserActivity, + ProducerTopics.USER_ACTIVITY_TOPIC, auditHandler.publishEvent(AuditEventType.AUTHENTICATION_SUCCESSFUL_AUDIT, event)); } else if (!event.getAuthentication().isAuthenticated() && event instanceof AbstractAuthenticationFailureEvent) { log.debug("publishEvent AbstractAuthenticationFailureEvent"); - template.asyncSendBody( + generalPublisher.publish( CamelEndpointConstant.directUserActivity, + ProducerTopics.USER_ACTIVITY_TOPIC, auditHandler.publishEvent(AuditEventType.AUTHENTICATION_FAILURE_AUDIT, event)); } else if (event instanceof LogoutSuccessEvent) { if (event @@ -89,8 +96,9 @@ public void onApplicationEvent(AbstractAuthenticationEvent event) { auditHandler.publishEvent(AuditEventType.SUCCESSFUL_USER_LOGOUT_AUDIT, event); AbstractAuthenticationToken auth = (AbstractAuthenticationToken) event.getAuthentication(); String name = auth.getName(); - template.asyncSendBody( + generalPublisher.publish( CamelEndpointConstant.directUserActivity, + ProducerTopics.USER_ACTIVITY_TOPIC, getOesAuditModel( name, event.getTimestamp(), AuditEventType.SUCCESSFUL_USER_LOGOUT_AUDIT)); } @@ -110,8 +118,9 @@ private void handleAuthenticationEvent( .collect(Collectors.toList()); AuditData data = new AuditData(name, roles, event.getTimestamp()); auditHandler.publishEvent(eventType, data); - template.asyncSendBody( + generalPublisher.publish( CamelEndpointConstant.directUserActivity, + ProducerTopics.USER_ACTIVITY_TOPIC, getOesAuditModel(name, event.getTimestamp(), eventType)); } diff --git a/gate-web/src/main/java/com/opsmx/spinnaker/gate/audit/UserActivityAuditListener.java b/gate-web/src/main/java/com/opsmx/spinnaker/gate/audit/UserActivityAuditListener.java index 9b1e8cea7f..27872819da 100644 --- a/gate-web/src/main/java/com/opsmx/spinnaker/gate/audit/UserActivityAuditListener.java +++ b/gate-web/src/main/java/com/opsmx/spinnaker/gate/audit/UserActivityAuditListener.java @@ -17,6 +17,8 @@ package com.opsmx.spinnaker.gate.audit; import com.google.gson.Gson; +import com.netflix.spinnaker.gate.kafkaConfig.ProducerTopics; +import com.netflix.spinnaker.gate.util.GeneralPublisher; import com.opsmx.spinnaker.gate.constant.CamelEndpointConstant; import com.opsmx.spinnaker.gate.enums.AuditEventType; import com.opsmx.spinnaker.gate.enums.OesServices; @@ -48,6 +50,8 @@ public class UserActivityAuditListener implements ApplicationListener { Gson gson = new Gson(); + @Autowired private GeneralPublisher generalPublisher; + @Autowired public UserActivityAuditListener(@Lazy AuditHandler auditHandler) { this.auditHandler = auditHandler; @@ -70,8 +74,10 @@ public void onApplicationEvent(ApplicationEvent event) { log.debug("publishing the event to audit service : {}", auditData); auditHandler.publishEvent(AuditEventType.USER_ACTIVITY_AUDIT, auditData); } - template.asyncSendBody( - CamelEndpointConstant.directUserActivity, getOesAuditModel(auditData)); + generalPublisher.publish( + CamelEndpointConstant.directUserActivity, + ProducerTopics.USER_ACTIVITY_TOPIC, + getOesAuditModel(auditData)); } } } catch (Exception e) {