Skip to content

Commit

Permalink
STORM-4075 Supprt mTLS between Storm and ZK
Browse files Browse the repository at this point in the history
  • Loading branch information
purshotam shah committed Sep 4, 2024
1 parent 664142c commit b44c86d
Show file tree
Hide file tree
Showing 5 changed files with 328 additions and 0 deletions.
2 changes: 2 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ storm.zookeeper.retry.interval: 1000
storm.zookeeper.retry.intervalceiling.millis: 30000
storm.zookeeper.auth.user: null
storm.zookeeper.auth.password: null
storm.zookeeper.ssl.enable: false
storm.zookeeper.ssl.hostnameVerification: false
storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
storm.thrift.transport: "org.apache.storm.security.auth.SimpleTransportPlugin"
Expand Down
5 changes: 5 additions & 0 deletions storm-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
19 changes: 19 additions & 0 deletions storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,25 @@ public class Config extends HashMap<String, Object> {
*/
@IsString
public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME = "storm.zookeeper.topology.auth.scheme";

/** Enable SSL/TLS for ZooKeeper client connection. */
@IsBoolean
public static final String ZK_SSL_ENABLE = "storm.zookeeper.ssl.enable";
/** Keystore location for ZooKeeper client connection over SSL. */
@IsString
public static final String ZK_SSL_KEYSTORE_LOCATION = "storm.zookeeper.ssl.keystore.location";
/** Keystore password for ZooKeeper client connection over SSL. */
@IsString
public static final String ZK_SSL_KEYSTORE_PASSWORD = "storm.zookeeper.ssl.keystore.password";
/** Truststore location for ZooKeeper client connection over SSL. */
@IsString
public static final String ZK_SSL_TRUSTSTORE_LOCATION = "storm.zookeeper.ssl.truststore.location";
/** Truststore password for ZooKeeper client connection over SSL. */
@IsString
public static final String ZK_SSL_TRUSTSTORE_PASSWORD = "storm.zookeeper.ssl.truststore.password";
/** Enable or disable hostname verification.*/
@IsBoolean
public static final String ZK_SSL_HOSTNAME_VERIFICATION = "storm.zookeeper.ssl.hostnameVerification";
/**
* The delegate for serializing metadata, should be used for serialized objects stored in zookeeper and on disk. This is NOT used for
* compressing serialized tuples sent between topologies.
Expand Down
119 changes: 119 additions & 0 deletions storm-client/src/jvm/org/apache/storm/utils/CuratorUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.naming.ConfigurationException;
import org.apache.storm.Config;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.storm.shade.org.apache.curator.framework.api.ACLProvider;
import org.apache.storm.shade.org.apache.zookeeper.client.ZKClientConfig;
import org.apache.storm.shade.org.apache.zookeeper.common.ClientX509Util;
import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CuratorUtils {
public static final Logger LOG = LoggerFactory.getLogger(CuratorUtils.class);
public static final String CLIENT_CNXN
= org.apache.storm.shade.org.apache.zookeeper.ClientCnxnSocketNetty.class.getName();

public static CuratorFramework newCurator(Map<String, Object> conf, List<String> servers, Object port, String root,
List<ACL> defaultAcl) {
Expand Down Expand Up @@ -84,6 +89,120 @@ protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, fina
if (auth != null && auth.scheme != null && auth.payload != null) {
builder.authorization(auth.scheme, auth.payload);
}
boolean sslEnabled = ObjectReader.getBoolean(conf.get(Config.ZK_SSL_ENABLE), false);
if (sslEnabled) {
TruststoreKeystore truststoreKeystore = new TruststoreKeystore(conf);
ZKClientConfig zkClientConfig = new ZKClientConfig();
try {
setSslConfiguration(zkClientConfig, truststoreKeystore);
} catch (ConfigurationException e) {
throw new RuntimeException(e);
}
builder.zkClientConfig(zkClientConfig);
}
}

/**
* Configure ZooKeeper Client with SSL/TLS connection.
* @param zkClientConfig ZooKeeper Client configuration
* @param truststoreKeystore The truststore and keystore configs
*/
private static void setSslConfiguration(ZKClientConfig zkClientConfig,
TruststoreKeystore truststoreKeystore) throws ConfigurationException {
setSslConfiguration(zkClientConfig, new ClientX509Util(), truststoreKeystore);
}

private static void setSslConfiguration(ZKClientConfig zkClientConfig,
ClientX509Util x509Util, TruststoreKeystore truststoreKeystore)
throws ConfigurationException {
validateSslConfiguration(truststoreKeystore);
LOG.info("Configuring the ZooKeeper client to use SSL/TLS encryption for connecting to the "
+ "ZooKeeper server.");
LOG.debug("Configuring the ZooKeeper client with {} location: {}.",
truststoreKeystore.keystoreLocation,
Config.ZK_SSL_KEYSTORE_LOCATION);
LOG.debug("Configuring the ZooKeeper client with {} location: {}.",
truststoreKeystore.truststoreLocation,
Config.ZK_SSL_TRUSTSTORE_LOCATION);

zkClientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
zkClientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
CLIENT_CNXN);
zkClientConfig.setProperty(x509Util.getSslKeystoreLocationProperty(),
truststoreKeystore.keystoreLocation);
zkClientConfig.setProperty(x509Util.getSslKeystorePasswdProperty(),
truststoreKeystore.keystorePassword);
zkClientConfig.setProperty(x509Util.getSslTruststoreLocationProperty(),
truststoreKeystore.truststoreLocation);
zkClientConfig.setProperty(x509Util.getSslTruststorePasswdProperty(),
truststoreKeystore.truststorePassword);
zkClientConfig.setProperty(x509Util.getSslHostnameVerificationEnabledProperty(),
truststoreKeystore.hostnameVerification.toString());
}

private static void validateSslConfiguration(TruststoreKeystore truststoreKeystore) throws ConfigurationException {
if (StringUtils.isEmpty(truststoreKeystore.keystoreLocation)) {
throw new ConfigurationException(
"The keystore location parameter is empty for the ZooKeeper client connection.");
}
if (StringUtils.isEmpty(truststoreKeystore.keystorePassword)) {
throw new ConfigurationException(
"The keystore password parameter is empty for the ZooKeeper client connection.");
}
if (StringUtils.isEmpty(truststoreKeystore.truststoreLocation)) {
throw new ConfigurationException(
"The truststore location parameter is empty for the ZooKeeper client connection" + ".");
}
if (StringUtils.isEmpty(truststoreKeystore.truststorePassword)) {
throw new ConfigurationException(
"The truststore password parameter is empty for the ZooKeeper client connection" + ".");
}
}


/**
* Helper class to contain the Truststore/Keystore paths for the ZK client connection over
* SSL/TLS.
*/
public static class TruststoreKeystore {
private final String keystoreLocation;
private final String keystorePassword;
private final String truststoreLocation;
private final String truststorePassword;
private final Boolean hostnameVerification;

/**
* Configuration for the ZooKeeper connection when SSL/TLS is enabled.
*
* @param conf configuration map
*/
public TruststoreKeystore(Map<String, Object> conf) {
keystoreLocation = ObjectReader.getString(conf.get(Config.ZK_SSL_KEYSTORE_LOCATION), "");
keystorePassword = ObjectReader.getString(conf.get(Config.ZK_SSL_KEYSTORE_PASSWORD), "");
truststoreLocation = ObjectReader.getString(conf.get(Config.ZK_SSL_TRUSTSTORE_LOCATION), "");
truststorePassword = ObjectReader.getString(conf.get(Config.ZK_SSL_TRUSTSTORE_PASSWORD), "");
hostnameVerification = ObjectReader.getBoolean(conf.get(Config.ZK_SSL_HOSTNAME_VERIFICATION), true);
}

public String getKeystoreLocation() {
return keystoreLocation;
}

public String getKeystorePassword() {
return keystorePassword;
}

public String getTruststoreLocation() {
return truststoreLocation;
}

public String getTruststorePassword() {
return truststorePassword;
}

public Boolean getHostnameVerification() {
return hostnameVerification;
}
}

public static void testSetupBuilder(CuratorFrameworkFactory.Builder
Expand Down
Loading

0 comments on commit b44c86d

Please sign in to comment.