Skip to content

Commit

Permalink
[ANCHOR-836] Add configuration event.queue.kafka.ssl_cert_verify (#…
Browse files Browse the repository at this point in the history
…1528)

### Description

- Add configuration `event.queue.kafka.ssl_cert_verify` 

### Context

This flag is for dev and test environment where a valid CA may not be
present.
  • Loading branch information
lijamie98 authored Oct 1, 2024
1 parent 560d342 commit 52833bf
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package org.stellar.anchor.platform.config;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class KafkaConfig {
/**
* A comma-separated list of host:port pairs that are the addresses of one or more brokers in a
Expand Down Expand Up @@ -38,6 +40,9 @@ public class KafkaConfig {
/** The SASL mechanism used for authentication. */
SaslMechanism saslMechanism;

/** The certificate verification flag. */
Boolean sslVerifyCert = Boolean.TRUE;

/** the SSL keystore location. */
String sslKeystoreLocation;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class MskConfig extends KafkaConfig {
int pollTimeoutSeconds,
SecurityProtocol securityProtocol,
SaslMechanism saslMechanism,
Boolean sslVerifyCert,
String sslKeystoreLocation,
String sslTruststoreLocation) {

Expand All @@ -32,6 +33,7 @@ public class MskConfig extends KafkaConfig {
pollTimeoutSeconds,
securityProtocol,
saslMechanism,
sslVerifyCert,
sslKeystoreLocation,
sslTruststoreLocation);
this.useIAM = useIAM;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ void validateKafka(PropertyQueueConfig config, Errors errors) {
}
}

if (kafkaConfig.getSecurityProtocol() == KafkaConfig.SecurityProtocol.SASL_SSL) {
if (kafkaConfig.getSecurityProtocol() == KafkaConfig.SecurityProtocol.SASL_SSL
&& kafkaConfig.getSslVerifyCert()) {
if (kafkaConfig.getSaslMechanism() == null) {
errors.reject(
"kafka-sasl-mechanism-empty",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.stellar.anchor.event.EventService;
import org.stellar.anchor.event.EventService.EventQueue;
import org.stellar.anchor.platform.config.KafkaConfig;
import org.stellar.anchor.platform.utils.TrustAllSslEngineFactory;
import org.stellar.anchor.util.GsonUtils;
import org.stellar.anchor.util.Log;

Expand Down Expand Up @@ -183,55 +184,57 @@ Consumer<String, String> createConsumer() throws InvalidConfigException {
return new KafkaConsumer<>(props);
}

void configureAuth(Properties props) throws InvalidConfigException {
switch (kafkaConfig.getSecurityProtocol()) {
case SASL_PLAINTEXT:
case SASL_SSL:
// Check if the username and password are set
if (isEmpty(secret(SECRET_EVENTS_QUEUE_KAFKA_USERNAME))) {
String msg =
SECRET_EVENTS_QUEUE_KAFKA_USERNAME
+ " is not set. Please provide the Kafka username.";
Log.error(msg);
throw new InvalidConfigException(msg);
}
if (isEmpty(secret(SECRET_EVENTS_QUEUE_KAFKA_PASSWORD))) {
String msg =
SECRET_EVENTS_QUEUE_KAFKA_PASSWORD
+ " is not set. Please provide the Kafka password.";
Log.error(msg);
throw new InvalidConfigException(msg);
}

// Set the SASL login information
props.put(
"sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ secret(SECRET_EVENTS_QUEUE_KAFKA_USERNAME)
+ "\" password=\""
+ secret(SECRET_EVENTS_QUEUE_KAFKA_PASSWORD)
+ "\";");

break;
void configureAuthSaslLogin(Properties props) throws InvalidConfigException {
// Check if the username and password are set
if (isEmpty(secret(SECRET_EVENTS_QUEUE_KAFKA_USERNAME))) {
String msg =
SECRET_EVENTS_QUEUE_KAFKA_USERNAME + " is not set. Please provide the Kafka username.";
Log.error(msg);
throw new InvalidConfigException(msg);
}
if (isEmpty(secret(SECRET_EVENTS_QUEUE_KAFKA_PASSWORD))) {
String msg =
SECRET_EVENTS_QUEUE_KAFKA_PASSWORD + " is not set. Please provide the Kafka password.";
Log.error(msg);
throw new InvalidConfigException(msg);
}

// Set the SASL login information
props.put(
"sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ secret(SECRET_EVENTS_QUEUE_KAFKA_USERNAME)
+ "\" password=\""
+ secret(SECRET_EVENTS_QUEUE_KAFKA_PASSWORD)
+ "\";");
}

void configureAuth(Properties props) throws InvalidConfigException {
switch (kafkaConfig.getSecurityProtocol()) {
case SASL_PLAINTEXT:
configureAuthSaslLogin(props);
props.put(SECURITY_PROTOCOL_CONFIG, kafkaConfig.getSecurityProtocol().name());
props.put(SASL_MECHANISM, kafkaConfig.getSaslMechanism().getValue());
break;
case SASL_SSL:
configureAuthSaslLogin(props);
props.put(SECURITY_PROTOCOL_CONFIG, kafkaConfig.getSecurityProtocol().name());
props.put(SASL_MECHANISM, kafkaConfig.getSaslMechanism().getValue());
props.put(SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation);
props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);

if (!isEmpty(secret(SECRET_SSL_KEYSTORE_PASSWORD)))
props.put(SSL_KEYSTORE_PASSWORD_CONFIG, secret(SECRET_SSL_KEYSTORE_PASSWORD));
if (!isEmpty(secret(SECRET_SSL_KEY_PASSWORD)))
props.put(SSL_KEY_PASSWORD_CONFIG, secret(SECRET_SSL_KEY_PASSWORD));
if (!isEmpty(secret(SECRET_SSL_TRUSTSTORE_PASSWORD)))
props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, secret(SECRET_SSL_TRUSTSTORE_PASSWORD));

if (!kafkaConfig.getSslVerifyCert()) {
props.put(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
props.put(SSL_ENGINE_FACTORY_CLASS_CONFIG, TrustAllSslEngineFactory.class);
} else {
props.put(SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation);
props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);

if (!isEmpty(secret(SECRET_SSL_KEYSTORE_PASSWORD)))
props.put(SSL_KEYSTORE_PASSWORD_CONFIG, secret(SECRET_SSL_KEYSTORE_PASSWORD));
if (!isEmpty(secret(SECRET_SSL_KEY_PASSWORD)))
props.put(SSL_KEY_PASSWORD_CONFIG, secret(SECRET_SSL_KEY_PASSWORD));
if (!isEmpty(secret(SECRET_SSL_TRUSTSTORE_PASSWORD)))
props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, secret(SECRET_SSL_TRUSTSTORE_PASSWORD));
}
break;
case PLAINTEXT:
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.stellar.anchor.platform.utils;

import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.util.Map;
import java.util.Set;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.kafka.common.security.auth.SslEngineFactory;

public class TrustAllSslEngineFactory implements SslEngineFactory {

private final TrustManager TRUST_ALL_MANAGER =
new X509TrustManager() {

public X509Certificate[] getAcceptedIssuers() {
return null;
}

public void checkClientTrusted(X509Certificate[] certs, String authType) {
// empty
}

public void checkServerTrusted(X509Certificate[] certs, String authType) {
// empty
}
};

@Override
public SSLEngine createClientSslEngine(
String peerHost, int peerPort, String endpointIdentification) {
TrustManager[] trustManagers = new TrustManager[] {TRUST_ALL_MANAGER};
try {
SSLContext sslContext = SSLContext.getInstance("SSL");
sslContext.init(null, trustManagers, new SecureRandom());
SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
sslEngine.setUseClientMode(true);
return sslEngine;
} catch (NoSuchAlgorithmException | KeyManagementException e) {
throw new RuntimeException(e);
}
}

@Override
public SSLEngine createServerSslEngine(String peerHost, int peerPort) {
return null;
}

@Override
public boolean shouldBeRebuilt(Map<String, Object> nextConfigs) {
return false;
}

@Override
public Set<String> reconfigurableConfigs() {
return null;
}

@Override
public KeyStore keystore() {
return null;
}

@Override
public KeyStore truststore() {
return null;
}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,10 @@ events:
# `sasl_mechanism` can be one of the following supported mechanisms:
# `PLAIN`: PLAIN SASL mechanism
sasl_mechanism: PLAIN
# The configuration to enable SSL certificate verification. If true, the SSL certificate will be verified.
# For dev and test environments, it can be used to disable SSL certificate verification.
# For production environments, it is strongly recommended to set to true to avoid man-in-the-middle attacks.
ssl_verify_cert: true
# The SSL keystore location. The file path to the keystore file (jks).
ssl_keystore_location:
# The SSL truststore location. The file path to the truststore file (jks).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ events.queue.kafka.security_protocol:
events.queue.kafka.sasl_mechanism:
events.queue.kafka.username:
events.queue.kafka.password:
events.queue.kafka.ssl_verify_cert:
events.queue.kafka.ssl_keystore_location:
events.queue.kafka.ssl_truststore_location:
events.queue.msk.batch_size:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class EventConfigTest {
10,
KafkaConfig.SecurityProtocol.PLAINTEXT,
null,
true,
null,
null
),
Expand All @@ -104,6 +105,7 @@ class EventConfigTest {
10,
KafkaConfig.SecurityProtocol.PLAINTEXT,
null,
true,
null,
null
),
Expand All @@ -120,6 +122,7 @@ class EventConfigTest {
10,
KafkaConfig.SecurityProtocol.PLAINTEXT,
null,
true,
null,
null
),
Expand All @@ -136,6 +139,7 @@ class EventConfigTest {
10,
KafkaConfig.SecurityProtocol.PLAINTEXT,
null,
true,
null,
null
),
Expand All @@ -152,6 +156,7 @@ class EventConfigTest {
10,
KafkaConfig.SecurityProtocol.PLAINTEXT,
null,
true,
null,
null
),
Expand All @@ -168,14 +173,15 @@ class EventConfigTest {
10,
KafkaConfig.SecurityProtocol.PLAINTEXT,
null,
true,
null,
null
),
),
Arguments.of(
1,
"kafka-security-protocol-empty",
KafkaConfig("localhost:29092", "client_id", 1, 10, 500, 10, null, null, null, null),
KafkaConfig("localhost:29092", "client_id", 1, 10, 500, 10, null, null, true, null, null),
),
Arguments.of(
1,
Expand All @@ -189,6 +195,7 @@ class EventConfigTest {
10,
KafkaConfig.SecurityProtocol.SASL_PLAINTEXT,
null,
true,
null,
null
),
Expand Down Expand Up @@ -222,6 +229,7 @@ class EventConfigTest {
10,
KafkaConfig.SecurityProtocol.PLAINTEXT,
null,
true,
null,
null
),
Expand All @@ -239,6 +247,7 @@ class EventConfigTest {
10,
KafkaConfig.SecurityProtocol.PLAINTEXT,
null,
true,
null,
null
),
Expand All @@ -256,6 +265,7 @@ class EventConfigTest {
10,
KafkaConfig.SecurityProtocol.PLAINTEXT,
null,
true,
null,
null
),
Expand All @@ -273,6 +283,7 @@ class EventConfigTest {
10,
KafkaConfig.SecurityProtocol.PLAINTEXT,
null,
true,
null,
null
),
Expand All @@ -290,6 +301,7 @@ class EventConfigTest {
10,
KafkaConfig.SecurityProtocol.PLAINTEXT,
null,
true,
null,
null
),
Expand All @@ -307,6 +319,7 @@ class EventConfigTest {
10,
KafkaConfig.SecurityProtocol.PLAINTEXT,
null,
true,
null,
null
),
Expand Down
Loading

0 comments on commit 52833bf

Please sign in to comment.