Skip to content

Commit

Permalink
STORM-4075 Supprt mTLS between Storm and ZK (#3692)
Browse files Browse the repository at this point in the history
* STORM-4075 Supprt mTLS between Storm and ZK

* STORM-4075 Supprt mTLS between Storm and ZK - review comments

---------

Co-authored-by: purshotam shah <[email protected]>
  • Loading branch information
purushah and purshotam shah authored Oct 1, 2024
1 parent 274fb73 commit 4508edc
Show file tree
Hide file tree
Showing 5 changed files with 327 additions and 0 deletions.
2 changes: 2 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ storm.zookeeper.retry.interval: 1000
storm.zookeeper.retry.intervalceiling.millis: 30000
storm.zookeeper.auth.user: null
storm.zookeeper.auth.password: null
storm.zookeeper.ssl.enable: false
storm.zookeeper.ssl.hostnameVerification: false
storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
storm.thrift.transport: "org.apache.storm.security.auth.SimpleTransportPlugin"
Expand Down
5 changes: 5 additions & 0 deletions storm-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
19 changes: 19 additions & 0 deletions storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,25 @@ public class Config extends HashMap<String, Object> {
*/
@IsString
public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME = "storm.zookeeper.topology.auth.scheme";

/** Enable SSL/TLS for ZooKeeper client connection. */
@IsBoolean
public static final String ZK_SSL_ENABLE = "storm.zookeeper.ssl.enable";
/** Keystore location for ZooKeeper client connection over SSL. */
@IsString
public static final String STORM_ZOOKEEPER_SSL_KEYSTORE_PATH = "storm.zookeeper.ssl.keystore.path";
/** Keystore password for ZooKeeper client connection over SSL. */
@IsString
public static final String STORM_ZOOKEEPER_SSL_KEYSTORE_PASSWORD = "storm.zookeeper.ssl.keystore.password";
/** Truststore location for ZooKeeper client connection over SSL. */
@IsString
public static final String STORM_ZOOKEEPER_SSL_TRUSTSTORE_PATH = "storm.zookeeper.ssl.truststore.path";
/** Truststore password for ZooKeeper client connection over SSL. */
@IsString
public static final String STORM_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD = "storm.zookeeper.ssl.truststore.password";
/** Enable or disable hostname verification.*/
@IsBoolean
public static final String STORM_ZOOKEEPER_SSL_HOSTNAME_VERIFICATION = "storm.zookeeper.ssl.hostnameVerification";
/**
* The delegate for serializing metadata, should be used for serialized objects stored in zookeeper and on disk. This is NOT used for
* compressing serialized tuples sent between topologies.
Expand Down
118 changes: 118 additions & 0 deletions storm-client/src/jvm/org/apache/storm/utils/CuratorUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.naming.ConfigurationException;
import org.apache.storm.Config;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.storm.shade.org.apache.curator.framework.api.ACLProvider;
import org.apache.storm.shade.org.apache.zookeeper.client.ZKClientConfig;
import org.apache.storm.shade.org.apache.zookeeper.common.ClientX509Util;
import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CuratorUtils {
public static final Logger LOG = LoggerFactory.getLogger(CuratorUtils.class);
public static final String CLIENT_CNXN
= org.apache.storm.shade.org.apache.zookeeper.ClientCnxnSocketNetty.class.getName();

public static CuratorFramework newCurator(Map<String, Object> conf, List<String> servers, Object port, String root,
List<ACL> defaultAcl) {
Expand Down Expand Up @@ -84,6 +89,119 @@ protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, fina
if (auth != null && auth.scheme != null && auth.payload != null) {
builder.authorization(auth.scheme, auth.payload);
}
boolean sslEnabled = ObjectReader.getBoolean(conf.get(Config.ZK_SSL_ENABLE), false);
if (sslEnabled) {
SslConf sslConf = new SslConf(conf);
ZKClientConfig zkClientConfig = new ZKClientConfig();
try {
setSslConfiguration(zkClientConfig, new ClientX509Util(), sslConf);
} catch (ConfigurationException e) {
throw new RuntimeException(e);
}
builder.zkClientConfig(zkClientConfig);
}
}

/**
* Configure ZooKeeper Client with SSL/TLS connection.
* @param zkClientConfig ZooKeeper Client configuration
* @param x509Util The X509 utility
* @param sslConf The truststore and keystore configs
*/
private static void setSslConfiguration(ZKClientConfig zkClientConfig,
ClientX509Util x509Util, SslConf sslConf)
throws ConfigurationException {
validateSslConfiguration(sslConf);
LOG.info("Configuring the ZooKeeper client to use SSL/TLS encryption for connecting to the "
+ "ZooKeeper server.");
LOG.debug("Configuring the ZooKeeper client with {} location: {}.",
sslConf.keystoreLocation,
Config.STORM_ZOOKEEPER_SSL_KEYSTORE_PATH);
LOG.debug("Configuring the ZooKeeper client with {} location: {}.",
sslConf.truststoreLocation,
Config.STORM_ZOOKEEPER_SSL_TRUSTSTORE_PATH);

zkClientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
zkClientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
CLIENT_CNXN);
zkClientConfig.setProperty(x509Util.getSslKeystoreLocationProperty(),
sslConf.keystoreLocation);
zkClientConfig.setProperty(x509Util.getSslKeystorePasswdProperty(),
sslConf.keystorePassword);
zkClientConfig.setProperty(x509Util.getSslTruststoreLocationProperty(),
sslConf.truststoreLocation);
zkClientConfig.setProperty(x509Util.getSslTruststorePasswdProperty(),
sslConf.truststorePassword);
zkClientConfig.setProperty(x509Util.getSslHostnameVerificationEnabledProperty(),
sslConf.hostnameVerification.toString());
}

private static void validateSslConfiguration(SslConf sslConf) throws ConfigurationException {
if (StringUtils.isEmpty(sslConf.getKeystoreLocation())) {
throw new ConfigurationException(
"The keystore location parameter is empty for the ZooKeeper client connection.");
}
if (StringUtils.isEmpty(sslConf.getKeystorePassword())) {
throw new ConfigurationException(
"The keystore password parameter is empty for the ZooKeeper client connection.");
}
if (StringUtils.isEmpty(sslConf.getTruststoreLocation())) {
throw new ConfigurationException(
"The truststore location parameter is empty for the ZooKeeper client connection" + ".");
}
if (StringUtils.isEmpty(sslConf.getTruststorePassword())) {
throw new ConfigurationException(
"The truststore password parameter is empty for the ZooKeeper client connection" + ".");
}
}

public static SslConf getSslConf(Map<String, Object> conf) {
return new SslConf(conf);
}
/**
* Helper class to contain the Truststore/Keystore paths for the ZK client connection over
* SSL/TLS.
*/

static final class SslConf {
private final String keystoreLocation;
private final String keystorePassword;
private final String truststoreLocation;
private final String truststorePassword;
private final Boolean hostnameVerification;

/**
* Configuration for the ZooKeeper connection when SSL/TLS is enabled.
*
* @param conf configuration map
*/
private SslConf(Map<String, Object> conf) {
keystoreLocation = ObjectReader.getString(conf.get(Config.STORM_ZOOKEEPER_SSL_KEYSTORE_PATH), "");
keystorePassword = ObjectReader.getString(conf.get(Config.STORM_ZOOKEEPER_SSL_KEYSTORE_PASSWORD), "");
truststoreLocation = ObjectReader.getString(conf.get(Config.STORM_ZOOKEEPER_SSL_TRUSTSTORE_PATH), "");
truststorePassword = ObjectReader.getString(conf.get(Config.STORM_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD), "");
hostnameVerification = ObjectReader.getBoolean(conf.get(Config.STORM_ZOOKEEPER_SSL_HOSTNAME_VERIFICATION), true);
}

public String getKeystoreLocation() {
return keystoreLocation;
}

public String getKeystorePassword() {
return keystorePassword;
}

public String getTruststoreLocation() {
return truststoreLocation;
}

public String getTruststorePassword() {
return truststorePassword;
}

public Boolean getHostnameVerification() {
return hostnameVerification;
}
}

public static void testSetupBuilder(CuratorFrameworkFactory.Builder
Expand Down
Loading

0 comments on commit 4508edc

Please sign in to comment.