Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OP-23081: Added kafka configs and producers #501

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gate-web/gate-web.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ dependencies {
force(true)
}
implementation 'org.apache.camel:camel-jackson:3.14.1'
implementation 'org.springframework.kafka:spring-kafka:3.1.2'

runtimeOnly "io.spinnaker.kork:kork-runtime"
runtimeOnly "org.springframework.boot:spring-boot-properties-migrator"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.netflix.spinnaker.gate.kafkaConfig;

import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
@ConditionalOnExpression("${kafka-broker.enabled:true}")
public class KafkaProducerConfig {

@Autowired private KafkaProperties kafkaProperties;

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapAddress());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
if (kafkaProperties.getSecurity() != null
&& StringUtils.isNotBlank(kafkaProperties.getSecurity().getProtocol())) {
configProps.put(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
kafkaProperties.getSecurity().getProtocol());
}
if (kafkaProperties.getSasl() != null
&& StringUtils.isNotBlank(kafkaProperties.getSasl().getMechanism())) {
configProps.put(SaslConfigs.SASL_MECHANISM, kafkaProperties.getSasl().getMechanism());
}
if (kafkaProperties.getSasl() != null
&& StringUtils.isNotBlank(kafkaProperties.getSasl().getUsername())
&& StringUtils.isNotBlank(kafkaProperties.getSasl().getPassword())) {
configProps.put(
SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\""
+ kafkaProperties.getSasl().getUsername()
+ "\" password=\""
+ kafkaProperties.getSasl().getPassword()
+ "\";");
}
if (kafkaProperties.getSsl() != null
&& StringUtils.isNotBlank(kafkaProperties.getSsl().getKeyPassword())) {
configProps.put(
SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaProperties.getSsl().getKeyPassword());
}
if (kafkaProperties.getSsl() != null
&& StringUtils.isNotBlank(kafkaProperties.getSsl().getKeystorePassword())) {
configProps.put(
SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaProperties.getSsl().getKeystorePassword());
}
if (kafkaProperties.getSsl() != null
&& StringUtils.isNotBlank(kafkaProperties.getSsl().getKeystoreLocation())) {
configProps.put(
SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaProperties.getSsl().getKeystoreLocation());
}
if (kafkaProperties.getSsl() != null
&& StringUtils.isNotBlank(kafkaProperties.getSsl().getTruststoreLocation())) {
configProps.put(
SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaProperties.getSsl().getTruststoreLocation());
}
if (kafkaProperties.getSsl() != null
&& StringUtils.isNotBlank(kafkaProperties.getSsl().getTruststorePassword())) {
configProps.put(
SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaProperties.getSsl().getTruststorePassword());
}
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.netflix.spinnaker.gate.kafkaConfig;

import lombok.Data;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Data
@Configuration
@ConfigurationProperties(prefix = "kafka-broker")
@ConditionalOnExpression("${kafka-broker.enabled:true}")
public class KafkaProperties {

private String bootstrapAddress;
private Security security;
private Sasl sasl;
private Ssl ssl;

@Data
@Configuration
@ConditionalOnExpression("${kafka-broker.enabled:true}")
@ConfigurationProperties(prefix = "kafka-broker.security")
public static class Security {
private String protocol;
}

@Data
@Configuration
@ConditionalOnExpression("${kafka-broker.enabled:true}")
@ConfigurationProperties(prefix = "kafka-broker.sasl")
public static class Sasl {
private String mechanism;
private String username;
private String password;
}

@Data
@Configuration
@ConditionalOnExpression("${kafka-broker.enabled:true}")
@ConfigurationProperties(prefix = "kafka-broker.ssl")
public static class Ssl {
private String keystoreLocation;
private String keystorePassword;
private String truststoreLocation;
private String truststorePassword;
private String keyPassword;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.netflix.spinnaker.gate.kafkaConfig;

public interface ProducerTopics {
String USER_ACTIVITY_TOPIC = "user-activity-topic";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.netflix.spinnaker.gate.util;

import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class GeneralPublisher {

@Autowired(required = false)
private ProducerTemplate producerTemplate;

@Autowired(required = false)
private KafkaTemplate kafkaTemplate;

public void publish(String rabbitMQEndpoint, String kafkaTopic, String payload) {
if (producerTemplate != null) {
producerTemplate.asyncSendBody(rabbitMQEndpoint, payload);
}
if (kafkaTemplate != null) {
kafkaTemplate.send(kafkaTopic, payload);
}
}

public void publishIsdToSpin(String rabbitMQEndpoint, String kafkaTopic, String payload) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.opsmx.spinnaker.gate.audit;

import com.google.gson.Gson;
import com.netflix.spinnaker.gate.kafkaConfig.ProducerTopics;
import com.netflix.spinnaker.gate.util.GeneralPublisher;
import com.opsmx.spinnaker.gate.constant.CamelEndpointConstant;
import com.opsmx.spinnaker.gate.enums.AuditEventType;
import com.opsmx.spinnaker.gate.model.AuditData;
Expand Down Expand Up @@ -45,6 +47,8 @@ public class AuthenticationAuditListener extends AbstractAuthenticationAuditList
@Autowired(required = false)
private AuditHandler auditHandler;

@Autowired private GeneralPublisher generalPublisher;

@Autowired @Lazy private ProducerTemplate template;

Gson gson = new Gson();
Expand All @@ -66,14 +70,17 @@ public void onApplicationEvent(AbstractAuthenticationEvent event) {
if (event.getAuthentication().isAuthenticated()
&& event instanceof AuthenticationSuccessEvent) {
log.debug("publishEvent AuthenticationSuccessEvent");
template.asyncSendBody(

generalPublisher.publish(
CamelEndpointConstant.directUserActivity,
ProducerTopics.USER_ACTIVITY_TOPIC,
auditHandler.publishEvent(AuditEventType.AUTHENTICATION_SUCCESSFUL_AUDIT, event));
} else if (!event.getAuthentication().isAuthenticated()
&& event instanceof AbstractAuthenticationFailureEvent) {
log.debug("publishEvent AbstractAuthenticationFailureEvent");
template.asyncSendBody(
generalPublisher.publish(
CamelEndpointConstant.directUserActivity,
ProducerTopics.USER_ACTIVITY_TOPIC,
auditHandler.publishEvent(AuditEventType.AUTHENTICATION_FAILURE_AUDIT, event));
} else if (event instanceof LogoutSuccessEvent) {
if (event
Expand All @@ -89,8 +96,9 @@ public void onApplicationEvent(AbstractAuthenticationEvent event) {
auditHandler.publishEvent(AuditEventType.SUCCESSFUL_USER_LOGOUT_AUDIT, event);
AbstractAuthenticationToken auth = (AbstractAuthenticationToken) event.getAuthentication();
String name = auth.getName();
template.asyncSendBody(
generalPublisher.publish(
CamelEndpointConstant.directUserActivity,
ProducerTopics.USER_ACTIVITY_TOPIC,
getOesAuditModel(
name, event.getTimestamp(), AuditEventType.SUCCESSFUL_USER_LOGOUT_AUDIT));
}
Expand All @@ -110,8 +118,9 @@ private void handleAuthenticationEvent(
.collect(Collectors.toList());
AuditData data = new AuditData(name, roles, event.getTimestamp());
auditHandler.publishEvent(eventType, data);
template.asyncSendBody(
generalPublisher.publish(
CamelEndpointConstant.directUserActivity,
ProducerTopics.USER_ACTIVITY_TOPIC,
getOesAuditModel(name, event.getTimestamp(), eventType));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.opsmx.spinnaker.gate.audit;

import com.google.gson.Gson;
import com.netflix.spinnaker.gate.kafkaConfig.ProducerTopics;
import com.netflix.spinnaker.gate.util.GeneralPublisher;
import com.opsmx.spinnaker.gate.constant.CamelEndpointConstant;
import com.opsmx.spinnaker.gate.enums.AuditEventType;
import com.opsmx.spinnaker.gate.enums.OesServices;
Expand Down Expand Up @@ -48,6 +50,8 @@ public class UserActivityAuditListener implements ApplicationListener {

Gson gson = new Gson();

@Autowired private GeneralPublisher generalPublisher;

@Autowired
public UserActivityAuditListener(@Lazy AuditHandler auditHandler) {
this.auditHandler = auditHandler;
Expand All @@ -70,8 +74,10 @@ public void onApplicationEvent(ApplicationEvent event) {
log.debug("publishing the event to audit service : {}", auditData);
auditHandler.publishEvent(AuditEventType.USER_ACTIVITY_AUDIT, auditData);
}
template.asyncSendBody(
CamelEndpointConstant.directUserActivity, getOesAuditModel(auditData));
generalPublisher.publish(
CamelEndpointConstant.directUserActivity,
ProducerTopics.USER_ACTIVITY_TOPIC,
getOesAuditModel(auditData));
}
}
} catch (Exception e) {
Expand Down
Loading