Skip to content

Commit

Permalink
KAFKA-15853 Refactor KafkaConfig to use PasswordEncoderConfigs (apach…
Browse files Browse the repository at this point in the history
…e#15770)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
OmniaGM authored Apr 21, 2024
1 parent 98548c5 commit 5e96e5c
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 87 deletions.
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,14 @@ object ConfigCommand extends Logging {
}

private[admin] def createPasswordEncoder(encoderConfigs: Map[String, String]): PasswordEncoder = {
encoderConfigs.get(PasswordEncoderConfigs.SECRET)
val encoderSecret = encoderConfigs.getOrElse(PasswordEncoderConfigs.SECRET,
encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG)
val encoderSecret = encoderConfigs.getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG,
throw new IllegalArgumentException("Password encoder secret not specified"))
PasswordEncoder.encrypting(new Password(encoderSecret),
null,
encoderConfigs.getOrElse(PasswordEncoderConfigs.CIPHER_ALGORITHM, PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM),
encoderConfigs.get(PasswordEncoderConfigs.KEY_LENGTH).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_KEY_LENGTH),
encoderConfigs.get(PasswordEncoderConfigs.ITERATIONS).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_ITERATIONS))
encoderConfigs.getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT),
encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG).map(_.toInt).getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT),
encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG).map(_.toInt).getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT))
}

/**
Expand All @@ -239,8 +239,8 @@ object ConfigCommand extends Logging {
DynamicBrokerConfig.validateConfigs(configsToBeAdded, perBrokerConfig)
val passwordConfigs = configsToBeAdded.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig)
if (passwordConfigs.nonEmpty) {
require(passwordEncoderConfigs.containsKey(PasswordEncoderConfigs.SECRET),
s"${PasswordEncoderConfigs.SECRET} must be specified to update $passwordConfigs." +
require(passwordEncoderConfigs.containsKey(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG),
s"${PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG} must be specified to update $passwordConfigs." +
" Other password encoder configs like cipher algorithm and iterations may also be specified" +
" to override the default encoding parameters. Password encoder configs will not be persisted" +
" in ZooKeeper."
Expand Down
43 changes: 12 additions & 31 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,6 @@ object KafkaConfig {
val DelegationTokenExpiryTimeMsProp = "delegation.token.expiry.time.ms"
val DelegationTokenExpiryCheckIntervalMsProp = "delegation.token.expiry.check.interval.ms"

/** ********* Password encryption configuration for dynamic configs *********/
val PasswordEncoderSecretProp = PasswordEncoderConfigs.SECRET
val PasswordEncoderOldSecretProp = PasswordEncoderConfigs.OLD_SECRET
val PasswordEncoderKeyFactoryAlgorithmProp = PasswordEncoderConfigs.KEYFACTORY_ALGORITHM
val PasswordEncoderCipherAlgorithmProp = PasswordEncoderConfigs.CIPHER_ALGORITHM
val PasswordEncoderKeyLengthProp = PasswordEncoderConfigs.KEY_LENGTH
val PasswordEncoderIterationsProp = PasswordEncoderConfigs.ITERATIONS

/** Internal Configurations **/
val UnstableApiVersionsEnableProp = "unstable.api.versions.enable"
val UnstableMetadataVersionsEnableProp = "unstable.metadata.versions.enable"
Expand Down Expand Up @@ -419,17 +411,6 @@ object KafkaConfig {
val DelegationTokenExpiryTimeMsDoc = "The token validity time in milliseconds before the token needs to be renewed. Default value 1 day."
val DelegationTokenExpiryCheckIntervalDoc = "Scan interval to remove expired delegation tokens."

/** ********* Password encryption configuration for dynamic configs *********/
val PasswordEncoderSecretDoc = "The secret used for encoding dynamically configured passwords for this broker."
val PasswordEncoderOldSecretDoc = "The old secret that was used for encoding dynamically configured passwords. " +
"This is required only when the secret is updated. If specified, all dynamically encoded passwords are " +
s"decoded using this old secret and re-encoded using $PasswordEncoderSecretProp when broker starts up."
val PasswordEncoderKeyFactoryAlgorithmDoc = "The SecretKeyFactory algorithm used for encoding dynamically configured passwords. " +
"Default is PBKDF2WithHmacSHA512 if available and PBKDF2WithHmacSHA1 otherwise."
val PasswordEncoderCipherAlgorithmDoc = "The Cipher algorithm used for encoding dynamically configured passwords."
val PasswordEncoderKeyLengthDoc = "The key length used for encoding dynamically configured passwords."
val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords."

@nowarn("cat=deprecation")
val configDef = {
import ConfigDef.Importance._
Expand Down Expand Up @@ -765,12 +746,12 @@ object KafkaConfig {
.define(DelegationTokenExpiryCheckIntervalMsProp, LONG, Defaults.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS, atLeast(1), LOW, DelegationTokenExpiryCheckIntervalDoc)

/** ********* Password encryption configuration for dynamic configs *********/
.define(PasswordEncoderSecretProp, PASSWORD, null, MEDIUM, PasswordEncoderSecretDoc)
.define(PasswordEncoderOldSecretProp, PASSWORD, null, MEDIUM, PasswordEncoderOldSecretDoc)
.define(PasswordEncoderKeyFactoryAlgorithmProp, STRING, null, LOW, PasswordEncoderKeyFactoryAlgorithmDoc)
.define(PasswordEncoderCipherAlgorithmProp, STRING, Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM, LOW, PasswordEncoderCipherAlgorithmDoc)
.define(PasswordEncoderKeyLengthProp, INT, Defaults.PASSWORD_ENCODER_KEY_LENGTH, atLeast(8), LOW, PasswordEncoderKeyLengthDoc)
.define(PasswordEncoderIterationsProp, INT, Defaults.PASSWORD_ENCODER_ITERATIONS, atLeast(1024), LOW, PasswordEncoderIterationsDoc)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, PASSWORD, null, MEDIUM, PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, PASSWORD, null, MEDIUM, PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, STRING, null, LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, STRING, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT, atLeast(8), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT, atLeast(1024), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DOC)

/** ********* Raft Quorum Configuration *********/
.define(RaftConfig.QUORUM_VOTERS_CONFIG, LIST, Defaults.QUORUM_VOTERS, new RaftConfig.ControllerQuorumVotersValidator(), HIGH, RaftConfig.QUORUM_VOTERS_DOC)
Expand Down Expand Up @@ -1349,12 +1330,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val delegationTokenExpiryCheckIntervalMs = getLong(KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp)

/** ********* Password encryption configuration for dynamic configs *********/
def passwordEncoderSecret = Option(getPassword(KafkaConfig.PasswordEncoderSecretProp))
def passwordEncoderOldSecret = Option(getPassword(KafkaConfig.PasswordEncoderOldSecretProp))
def passwordEncoderCipherAlgorithm = getString(KafkaConfig.PasswordEncoderCipherAlgorithmProp)
def passwordEncoderKeyFactoryAlgorithm = getString(KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp)
def passwordEncoderKeyLength = getInt(KafkaConfig.PasswordEncoderKeyLengthProp)
def passwordEncoderIterations = getInt(KafkaConfig.PasswordEncoderIterationsProp)
def passwordEncoderSecret = Option(getPassword(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG))
def passwordEncoderOldSecret = Option(getPassword(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG))
def passwordEncoderCipherAlgorithm = getString(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG)
def passwordEncoderKeyFactoryAlgorithm = getString(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG)
def passwordEncoderKeyLength = getInt(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG)
def passwordEncoderIterations = getInt(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG)

/** ********* Quota Configuration **************/
val numQuotaSamples = getInt(KafkaConfig.NumQuotaSamplesProp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ package kafka.admin

import kafka.admin.ConfigCommand.ConfigCommandOptions
import kafka.cluster.{Broker, EndPoint}
import kafka.server.{KafkaConfig, QuorumTestHarness}
import kafka.server.QuorumTestHarness
import kafka.utils.{Exit, Logging}
import kafka.zk.{AdminZkClient, BrokerInfo}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.ZooKeeperInternals
import org.junit.jupiter.api.Assertions._
Expand Down Expand Up @@ -134,10 +135,10 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging {

// Password config update with encoder secret should succeed and encoded password must be stored in ZK
val configs = Map("listener.name.external.ssl.keystore.password" -> "secret", "log.cleaner.threads" -> "2")
val encoderConfigs = Map(KafkaConfig.PasswordEncoderSecretProp -> "encoder-secret")
val encoderConfigs = Map(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG -> "encoder-secret")
alterConfigWithZk(configs, Some(brokerId), encoderConfigs)
val brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId)
assertFalse(brokerConfigs.contains(KafkaConfig.PasswordEncoderSecretProp), "Encoder secret stored in ZooKeeper")
assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper")
assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")) // not encoded
val encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password")
val passwordEncoder = ConfigCommand.createPasswordEncoder(encoderConfigs)
Expand All @@ -146,11 +147,11 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging {

// Password config update with overrides for encoder parameters
val configs2 = Map("listener.name.internal.ssl.keystore.password" -> "secret2")
val encoderConfigs2 = Map(KafkaConfig.PasswordEncoderSecretProp -> "encoder-secret",
KafkaConfig.PasswordEncoderCipherAlgorithmProp -> "DES/CBC/PKCS5Padding",
KafkaConfig.PasswordEncoderIterationsProp -> "1024",
KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp -> "PBKDF2WithHmacSHA1",
KafkaConfig.PasswordEncoderKeyLengthProp -> "64")
val encoderConfigs2 = Map(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG -> "encoder-secret",
PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG -> "DES/CBC/PKCS5Padding",
PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG -> "1024",
PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG -> "PBKDF2WithHmacSHA1",
PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG -> "64")
alterConfigWithZk(configs2, Some(brokerId), encoderConfigs2)
val brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId)
val encodedPassword2 = brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
import org.apache.kafka.server.config.{ConfigType, KafkaSecurityConfigs, ZkConfigs}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
Expand Down Expand Up @@ -137,7 +137,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
props.put(KafkaSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(","))
props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "2000") // low value to test log rolling on config update
props.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2") // greater than one to test reducing threads
props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret")
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "dynamic-config-secret")
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 1680000000.toString)
props.put(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, 168.toString)

Expand Down Expand Up @@ -1117,7 +1117,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val propsEncodedWithOldSecret = props.clone().asInstanceOf[Properties]
val config = server.config
val oldSecret = "old-dynamic-config-secret"
config.dynamicConfig.staticBrokerConfigs.put(KafkaConfig.PasswordEncoderOldSecretProp, oldSecret)
config.dynamicConfig.staticBrokerConfigs.put(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, oldSecret)
val passwordConfigs = props.asScala.filter { case (k, _) => DynamicBrokerConfig.isPasswordConfig(k) }
assertTrue(passwordConfigs.nonEmpty, "Password configs not found")
val passwordDecoder = createPasswordEncoder(config, config.passwordEncoderSecret)
Expand Down Expand Up @@ -1595,7 +1595,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val externalListenerPrefix = listenerPrefix(SecureExternal)
val sslStoreProps = new Properties
sslStoreProps ++= securityProps(sslProperties, KEYSTORE_PROPS, externalListenerPrefix)
sslStoreProps.put(KafkaConfig.PasswordEncoderSecretProp, kafkaConfig.passwordEncoderSecret.map(_.value).orNull)
sslStoreProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, kafkaConfig.passwordEncoderSecret.map(_.value).orNull)
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)

val entityType = ConfigType.BROKER
Expand All @@ -1610,7 +1610,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
sslStoreProps.setProperty(configName, encodedValue)
}
}
sslStoreProps.remove(KafkaConfig.PasswordEncoderSecretProp)
sslStoreProps.remove(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG)
adminZkClient.changeConfigs(entityType, entityName, sslStoreProps)

val brokerProps = adminZkClient.fetchEntityConfig("brokers", kafkaConfig.brokerId.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.config.{Defaults, KafkaSecurityConfigs, ZkConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
Expand Down Expand Up @@ -330,7 +331,7 @@ class DynamicBrokerConfigTest {

private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean): Unit = {
val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
configProps.put(KafkaConfig.PasswordEncoderSecretProp, "broker.secret")
configProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "broker.secret")
val config = KafkaConfig(configProps)
config.dynamicConfig.initialize(None, None)

Expand Down Expand Up @@ -381,7 +382,7 @@ class DynamicBrokerConfigTest {
def testPasswordConfigEncryption(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val configWithoutSecret = KafkaConfig(props)
props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret")
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret")
val configWithSecret = KafkaConfig(props)
val dynamicProps = new Properties
dynamicProps.put(KafkaSecurityConfigs.SASL_JAAS_CONFIG, "myLoginModule required;")
Expand All @@ -402,7 +403,7 @@ class DynamicBrokerConfigTest {
def testPasswordConfigEncoderSecretChange(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaSecurityConfigs.SASL_JAAS_CONFIG, "staticLoginModule required;")
props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret")
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret")
val config = KafkaConfig(props)
config.dynamicConfig.initialize(None, None)
val dynamicProps = new Properties
Expand All @@ -421,14 +422,14 @@ class DynamicBrokerConfigTest {
assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaSecurityConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)

// New config with new secret should use the dynamic password config if new and old secrets are configured in KafkaConfig
props.put(KafkaConfig.PasswordEncoderSecretProp, "new-encoder-secret")
props.put(KafkaConfig.PasswordEncoderOldSecretProp, "config-encoder-secret")
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "new-encoder-secret")
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, "config-encoder-secret")
val newConfigWithNewAndOldSecret = KafkaConfig(props)
newConfigWithNewAndOldSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaSecurityConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)

// New config with new secret alone should revert to static password config since dynamic config cannot be decoded
props.put(KafkaConfig.PasswordEncoderSecretProp, "another-new-encoder-secret")
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "another-new-encoder-secret")
val newConfigWithNewSecret = KafkaConfig(props)
newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(KafkaSecurityConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)
Expand Down
Loading

0 comments on commit 5e96e5c

Please sign in to comment.