Skip to content

Commit

Permalink
[ANCHOR-801] Add Kafka SASL-SSL authentication support (#1498)
Browse files Browse the repository at this point in the history
### Description

- Support Kafka SASL-SSL authentication
- Add `docker-compose-kafka-sasl-ssl.yaml.yaml` to run Kafka in SASL_SSL
mode for testing.

### Context

- Answer the requests to support SASL-SSL

### Testing

- `./gradlew test`

### Documentation

N/A

### Known limitations

N/A
  • Loading branch information
lijamie98 authored Sep 13, 2024
1 parent 609b2c4 commit 5725e06
Show file tree
Hide file tree
Showing 25 changed files with 318 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Docker - Run Dev Stack - Kafka, Postgres, SEP24 Reference UI" type="JetRunConfigurationType">
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" />
<option name="MAIN_CLASS_NAME" value="org.stellar.anchor.platform.run_profiles.RunDockerDevStack" />
<module name="java-stellar-anchor-sdk.service-runner.main" />
<shortenClasspath name="ARGS_FILE" />
<extension name="net.ashald.envfile">
<option name="IS_ENABLED" value="false" />
<option name="IS_SUBST" value="false" />
<option name="IS_PATH_MACRO_SUPPORTED" value="false" />
<option name="IS_IGNORE_MISSING_FILES" value="false" />
<option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
<ENTRIES>
<ENTRY IS_ENABLED="true" PARSER="runconfig" IS_EXECUTABLE="false" />
</ENTRIES>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
Expand Down
25 changes: 25 additions & 0 deletions .run/Test Profile_ kafka-sasl-ssl.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Test Profile: kafka-sasl-ssl" type="JetRunConfigurationType" folderName="Run Test Profile">
<envs>
<env name="app.enableTest" value="true" />
<env name="KT_REFERENCE_SERVER_CONFIG" value="service-runner/src/main/resources/config/reference-config.yaml" />
<env name="TEST_PROFILE_NAME" value="kafka-sasl-ssl" />
</envs>
<option name="MAIN_CLASS_NAME" value="org.stellar.anchor.platform.run_profiles.RunTestProfile" />
<module name="java-stellar-anchor-sdk.service-runner.main" />
<shortenClasspath name="ARGS_FILE" />
<extension name="net.ashald.envfile">
<option name="IS_ENABLED" value="false" />
<option name="IS_SUBST" value="false" />
<option name="IS_PATH_MACRO_SUPPORTED" value="false" />
<option name="IS_IGNORE_MISSING_FILES" value="false" />
<option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
<ENTRIES>
<ENTRY IS_ENABLED="true" PARSER="runconfig" IS_EXECUTABLE="false" />
</ENTRIES>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,35 @@ public class KafkaConfig {
/** Determines the maximum amount of time to wait for the batch to be filled. */
int pollTimeoutSeconds;

/** The security protocol used to communicate with brokers. */
SecurityProtocol securityProtocol;

/** The SASL mechanism used for authentication. */
SaslMechanism saslMechanism;

/** the SSL keystore location. */
String sslKeystoreLocation;

/** the SSL truststore location. */
String sslTruststoreLocation;

public enum SecurityProtocol {
PLAINTEXT,
SASL_PLAINTEXT
SASL_PLAINTEXT,
SASL_SSL
}

public enum SaslMechanism {
PLAIN
}
PLAIN("PLAIN");

/** The security protocol used to communicate with brokers. */
SecurityProtocol securityProtocol;
String value;

/** The SASL mechanism used for authentication. */
SaslMechanism saslMechanism;
SaslMechanism(String value) {
this.value = value;
}

public String getValue() {
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ public class MskConfig extends KafkaConfig {
int batchSize,
int pollTimeoutSeconds,
SecurityProtocol securityProtocol,
SaslMechanism saslMechanism) {
SaslMechanism saslMechanism,
String sslKeystoreLocation,
String sslTruststoreLocation) {

super(
bootstrapServer,
clientId,
Expand All @@ -28,7 +31,9 @@ public class MskConfig extends KafkaConfig {
batchSize,
pollTimeoutSeconds,
securityProtocol,
saslMechanism);
saslMechanism,
sslKeystoreLocation,
sslTruststoreLocation);
this.useIAM = useIAM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public class PropertySecretConfig implements SecretConfig {
"secret.events.queue.kafka.username";
public static final String SECRET_EVENTS_QUEUE_KAFKA_PASSWORD =
"secret.events.queue.kafka.password";
public static final String SECRET_SSL_KEYSTORE_PASSWORD = "secret.ssl.keystore.password";
public static final String SECRET_SSL_KEY_PASSWORD = "secret.ssl.key.password";
public static final String SECRET_SSL_TRUSTSTORE_PASSWORD = "secret.ssl.truststore.password";

public String getSep6MoreInfoUrlJwtSecret() {
return SecretManager.getInstance().get(SECRET_SEP_6_MORE_INFO_URL_JWT_SECRET);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ public class SecretManager
PropertySecretConfig.SECRET_EVENTS_QUEUE_KAFKA_USERNAME,
PropertySecretConfig.SECRET_EVENTS_QUEUE_KAFKA_PASSWORD,
PropertyCustodySecretConfig.SECRET_FIREBLOCKS_SECRET_KEY,
PropertyCustodySecretConfig.SECRET_FIREBLOCKS_API_KEY);
PropertyCustodySecretConfig.SECRET_FIREBLOCKS_API_KEY,
PropertySecretConfig.SECRET_SSL_KEYSTORE_PASSWORD,
PropertySecretConfig.SECRET_SSL_KEY_PASSWORD,
PropertySecretConfig.SECRET_SSL_TRUSTSTORE_PASSWORD);

final Properties props = new Properties();

Expand All @@ -42,6 +45,10 @@ public static SecretManager getInstance() {
return secretManager;
}

public static String secret(String key) {
return getInstance().get(key);
}

@Override
public void initialize(@NotNull ConfigurableApplicationContext applicationContext) {
info("Secret manager started.");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.stellar.anchor.platform.event;

import lombok.SneakyThrows;
import org.apache.commons.lang3.NotImplementedException;
import org.stellar.anchor.config.event.EventConfig;
import org.stellar.anchor.event.EventService;
Expand All @@ -13,6 +14,7 @@ public DefaultEventService(EventConfig eventConfig) {
}

@Override
@SneakyThrows
public Session createSession(String sessionName, EventQueue eventQueue) {
if (eventConfig.isEnabled()) {
switch (eventConfig.getQueue().getType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,16 @@
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.*;
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
import static org.apache.kafka.common.config.SslConfigs.*;
import static org.stellar.anchor.platform.config.PropertySecretConfig.*;
import static org.stellar.anchor.platform.configurator.SecretManager.*;
import static org.stellar.anchor.platform.utils.ResourceHelper.findResourceFile;
import static org.stellar.anchor.platform.utils.ResourceHelper.resource;
import static org.stellar.anchor.util.StringHelper.isEmpty;

import io.micrometer.core.instrument.Metrics;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -26,23 +33,39 @@
import org.stellar.anchor.event.EventService;
import org.stellar.anchor.event.EventService.EventQueue;
import org.stellar.anchor.platform.config.KafkaConfig;
import org.stellar.anchor.platform.config.PropertySecretConfig;
import org.stellar.anchor.platform.configurator.SecretManager;
import org.stellar.anchor.util.GsonUtils;
import org.stellar.anchor.util.Log;

public class KafkaSession implements EventService.Session {

final KafkaConfig kafkaConfig;
final String sessionName;
final String topic;
Producer<String, String> producer = null;
Consumer<String, String> consumer = null;
String sslKeystoreLocation;
String sslTruststoreLocation;

KafkaSession(KafkaConfig kafkaConfig, String sessionName, EventQueue queue) {
KafkaSession(KafkaConfig kafkaConfig, String sessionName, EventQueue queue) throws IOException {
this.kafkaConfig = kafkaConfig;
this.sessionName = sessionName;
this.topic = queue.name();

if (kafkaConfig.getSecurityProtocol() == KafkaConfig.SecurityProtocol.SASL_SSL) {
// If the keystore and truststore files exist, use them, otherwise, use the resources
if (new File(kafkaConfig.getSslKeystoreLocation()).exists()) {
sslKeystoreLocation = kafkaConfig.getSslKeystoreLocation();
} else {
sslKeystoreLocation =
findResourceFile(resource(kafkaConfig.getSslKeystoreLocation())).getAbsolutePath();
}

if (new File(kafkaConfig.getSslTruststoreLocation()).exists()) {
sslTruststoreLocation = kafkaConfig.getSslTruststoreLocation();
} else {
sslTruststoreLocation =
findResourceFile(resource(kafkaConfig.getSslTruststoreLocation())).getAbsolutePath();
}
}
}

@Override
Expand Down Expand Up @@ -174,37 +197,53 @@ Consumer<String, String> createConsumer() throws InvalidConfigException {
void configureAuth(Properties props) throws InvalidConfigException {
switch (kafkaConfig.getSecurityProtocol()) {
case SASL_PLAINTEXT:
if (isEmpty(
SecretManager.getInstance()
.get(PropertySecretConfig.SECRET_EVENTS_QUEUE_KAFKA_USERNAME))) {
case SASL_SSL:
// Check if the username and password are set
if (isEmpty(secret(SECRET_EVENTS_QUEUE_KAFKA_USERNAME))) {
String msg =
PropertySecretConfig.SECRET_EVENTS_QUEUE_KAFKA_USERNAME
SECRET_EVENTS_QUEUE_KAFKA_USERNAME
+ " is not set. Please provide the Kafka username.";
Log.error(msg);
throw new InvalidConfigException(msg);
}
if (isEmpty(
SecretManager.getInstance()
.get(PropertySecretConfig.SECRET_EVENTS_QUEUE_KAFKA_PASSWORD))) {
if (isEmpty(secret(SECRET_EVENTS_QUEUE_KAFKA_PASSWORD))) {
String msg =
PropertySecretConfig.SECRET_EVENTS_QUEUE_KAFKA_PASSWORD
SECRET_EVENTS_QUEUE_KAFKA_PASSWORD
+ " is not set. Please provide the Kafka password.";
Log.error(msg);
throw new InvalidConfigException(msg);
}

props.put(SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SASL_MECHANISM, "PLAIN");
// Set the SASL login information
props.put(
"sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ SecretManager.getInstance()
.get(PropertySecretConfig.SECRET_EVENTS_QUEUE_KAFKA_USERNAME)
+ secret(SECRET_EVENTS_QUEUE_KAFKA_USERNAME)
+ "\" password=\""
+ SecretManager.getInstance()
.get(PropertySecretConfig.SECRET_EVENTS_QUEUE_KAFKA_PASSWORD)
+ secret(SECRET_EVENTS_QUEUE_KAFKA_PASSWORD)
+ "\";");

break;
}

switch (kafkaConfig.getSecurityProtocol()) {
case SASL_PLAINTEXT:
props.put(SECURITY_PROTOCOL_CONFIG, kafkaConfig.getSecurityProtocol().name());
props.put(SASL_MECHANISM, kafkaConfig.getSaslMechanism().getValue());
break;
case SASL_SSL:
props.put(SECURITY_PROTOCOL_CONFIG, kafkaConfig.getSecurityProtocol().name());
props.put(SASL_MECHANISM, kafkaConfig.getSaslMechanism().getValue());
props.put(SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation);
props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);

if (!isEmpty(secret(SECRET_SSL_KEYSTORE_PASSWORD)))
props.put(SSL_KEYSTORE_PASSWORD_CONFIG, secret(SECRET_SSL_KEYSTORE_PASSWORD));
if (!isEmpty(secret(SECRET_SSL_KEY_PASSWORD)))
props.put(SSL_KEY_PASSWORD_CONFIG, secret(SECRET_SSL_KEY_PASSWORD));
if (!isEmpty(secret(SECRET_SSL_TRUSTSTORE_PASSWORD)))
props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, secret(SECRET_SSL_TRUSTSTORE_PASSWORD));

case PLAINTEXT:
break;
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.stellar.anchor.platform.utils;

import java.io.File;
import java.io.IOException;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;

public class ResourceHelper {
static final ResourceLoader resourceLoader = new DefaultResourceLoader();

public static Resource resource(String resource) {
return resourceLoader.getResource(resource);
}

public static File findResourceFile(Resource resource) throws IOException {
if (resource.exists()) {
return resource.getFile();
}
throw new IOException("Resource not found: " + resource.getFilename());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -681,15 +681,32 @@ events:
poll_timeout_seconds: 60
# The security protocol used to communicate with the brokers.
# `security_protocol` can be one of the following:
# `PLAINTEXT`: no encryption
# `SASL_PLAINTEXT`: SASL authentication with no encryption
# If SASL_PLAINTEXT is used, the SECRET_EVENTS_QUEUE_KAFKA_USERNAME and SECRET_EVENTS_QUEUE_KAFKA_PASSWORD
# environment variables must be set.
# `PLAINTEXT`: no encryption
# `SASL_PLAINTEXT`: SASL authentication with no encryption
# `SASL_SSL`: SASL authentication with SSL encryption
#
# If SASL_PLAINTEXT or SASL_SSL is used, the following secrets must be set
# - SECRET_EVENTS_QUEUE_KAFKA_USERNAME
# - SECRET_EVENTS_QUEUE_KAFKA_PASSWORD
#
# If the SASL_SSL is used, the following values must be set
# - ssl_keystore_location
# - ssl_keystore_password
# the following secrets must be set:
# - SECRET_SSL_KEYSTORE_PASSWORD
# - SECRET_SSL_TRUSTSTORE_PASSWORD
# - SECRET_SSL_KEY_PASSWORD
#
security_protocol: PLAINTEXT
# The SASL mechanism used for authentication when `security_protocol` is set to `SASL_PLAINTEXT`.
# `sasl_mechanism` can be one of the following:
# `PLAIN`: PLAIN SASL mechanism
sasl_mechanism:
# The SASL mechanism used for authentication and must match what is used on the broker.
# `sasl_mechanism` can be one of the following supported mechanisms:
# `PLAIN`: PLAIN SASL mechanism
sasl_mechanism: PLAIN
# The SSL keystore location. The file path to the keystore file (jks).
ssl_keystore_location:
# The SSL truststore location. The file path to the truststore file (jks).
ssl_truststore_location:

# If the value of `publisher.type` is "sqs", the `events.publisher.sqs` field must be defined.
sqs:
# Use IAM authentication for AWS MSK or AWS SQS.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ events.queue.kafka.security_protocol:
events.queue.kafka.sasl_mechanism:
events.queue.kafka.username:
events.queue.kafka.password:
events.queue.kafka.ssl_keystore_location:
events.queue.kafka.ssl_truststore_location:
events.queue.msk.batch_size:
events.queue.msk.bootstrap_server:
events.queue.msk.client_id:
Expand Down
Loading

0 comments on commit 5725e06

Please sign in to comment.