diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index a10722f000f11..644043f761036 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -212,14 +212,14 @@ object ConfigCommand extends Logging { } private[admin] def createPasswordEncoder(encoderConfigs: Map[String, String]): PasswordEncoder = { - encoderConfigs.get(PasswordEncoderConfigs.SECRET) - val encoderSecret = encoderConfigs.getOrElse(PasswordEncoderConfigs.SECRET, + encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG) + val encoderSecret = encoderConfigs.getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, throw new IllegalArgumentException("Password encoder secret not specified")) PasswordEncoder.encrypting(new Password(encoderSecret), null, - encoderConfigs.getOrElse(PasswordEncoderConfigs.CIPHER_ALGORITHM, PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM), - encoderConfigs.get(PasswordEncoderConfigs.KEY_LENGTH).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_KEY_LENGTH), - encoderConfigs.get(PasswordEncoderConfigs.ITERATIONS).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_ITERATIONS)) + encoderConfigs.getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT), + encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG).map(_.toInt).getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT), + encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG).map(_.toInt).getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT)) } /** @@ -239,8 +239,8 @@ object ConfigCommand extends Logging { DynamicBrokerConfig.validateConfigs(configsToBeAdded, perBrokerConfig) val passwordConfigs = configsToBeAdded.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig) if (passwordConfigs.nonEmpty) { - require(passwordEncoderConfigs.containsKey(PasswordEncoderConfigs.SECRET), - s"${PasswordEncoderConfigs.SECRET} must be specified to update $passwordConfigs." + + require(passwordEncoderConfigs.containsKey(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG), + s"${PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG} must be specified to update $passwordConfigs." + " Other password encoder configs like cipher algorithm and iterations may also be specified" + " to override the default encoding parameters. Password encoder configs will not be persisted" + " in ZooKeeper." diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 9cee2aa61a93a..217303d5ee8c3 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -203,14 +203,6 @@ object KafkaConfig { val DelegationTokenExpiryTimeMsProp = "delegation.token.expiry.time.ms" val DelegationTokenExpiryCheckIntervalMsProp = "delegation.token.expiry.check.interval.ms" - /** ********* Password encryption configuration for dynamic configs *********/ - val PasswordEncoderSecretProp = PasswordEncoderConfigs.SECRET - val PasswordEncoderOldSecretProp = PasswordEncoderConfigs.OLD_SECRET - val PasswordEncoderKeyFactoryAlgorithmProp = PasswordEncoderConfigs.KEYFACTORY_ALGORITHM - val PasswordEncoderCipherAlgorithmProp = PasswordEncoderConfigs.CIPHER_ALGORITHM - val PasswordEncoderKeyLengthProp = PasswordEncoderConfigs.KEY_LENGTH - val PasswordEncoderIterationsProp = PasswordEncoderConfigs.ITERATIONS - /** Internal Configurations **/ val UnstableApiVersionsEnableProp = "unstable.api.versions.enable" val UnstableMetadataVersionsEnableProp = "unstable.metadata.versions.enable" @@ -419,17 +411,6 @@ object KafkaConfig { val DelegationTokenExpiryTimeMsDoc = "The token validity time in milliseconds before the token needs to be renewed. Default value 1 day." val DelegationTokenExpiryCheckIntervalDoc = "Scan interval to remove expired delegation tokens." - /** ********* Password encryption configuration for dynamic configs *********/ - val PasswordEncoderSecretDoc = "The secret used for encoding dynamically configured passwords for this broker." - val PasswordEncoderOldSecretDoc = "The old secret that was used for encoding dynamically configured passwords. " + - "This is required only when the secret is updated. If specified, all dynamically encoded passwords are " + - s"decoded using this old secret and re-encoded using $PasswordEncoderSecretProp when broker starts up." - val PasswordEncoderKeyFactoryAlgorithmDoc = "The SecretKeyFactory algorithm used for encoding dynamically configured passwords. " + - "Default is PBKDF2WithHmacSHA512 if available and PBKDF2WithHmacSHA1 otherwise." - val PasswordEncoderCipherAlgorithmDoc = "The Cipher algorithm used for encoding dynamically configured passwords." - val PasswordEncoderKeyLengthDoc = "The key length used for encoding dynamically configured passwords." - val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords." - @nowarn("cat=deprecation") val configDef = { import ConfigDef.Importance._ @@ -765,12 +746,12 @@ object KafkaConfig { .define(DelegationTokenExpiryCheckIntervalMsProp, LONG, Defaults.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS, atLeast(1), LOW, DelegationTokenExpiryCheckIntervalDoc) /** ********* Password encryption configuration for dynamic configs *********/ - .define(PasswordEncoderSecretProp, PASSWORD, null, MEDIUM, PasswordEncoderSecretDoc) - .define(PasswordEncoderOldSecretProp, PASSWORD, null, MEDIUM, PasswordEncoderOldSecretDoc) - .define(PasswordEncoderKeyFactoryAlgorithmProp, STRING, null, LOW, PasswordEncoderKeyFactoryAlgorithmDoc) - .define(PasswordEncoderCipherAlgorithmProp, STRING, Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM, LOW, PasswordEncoderCipherAlgorithmDoc) - .define(PasswordEncoderKeyLengthProp, INT, Defaults.PASSWORD_ENCODER_KEY_LENGTH, atLeast(8), LOW, PasswordEncoderKeyLengthDoc) - .define(PasswordEncoderIterationsProp, INT, Defaults.PASSWORD_ENCODER_ITERATIONS, atLeast(1024), LOW, PasswordEncoderIterationsDoc) + .define(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, PASSWORD, null, MEDIUM, PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_DOC) + .define(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, PASSWORD, null, MEDIUM, PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_DOC) + .define(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, STRING, null, LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_DOC) + .define(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, STRING, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DOC) + .define(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT, atLeast(8), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DOC) + .define(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT, atLeast(1024), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DOC) /** ********* Raft Quorum Configuration *********/ .define(RaftConfig.QUORUM_VOTERS_CONFIG, LIST, Defaults.QUORUM_VOTERS, new RaftConfig.ControllerQuorumVotersValidator(), HIGH, RaftConfig.QUORUM_VOTERS_DOC) @@ -1349,12 +1330,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val delegationTokenExpiryCheckIntervalMs = getLong(KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp) /** ********* Password encryption configuration for dynamic configs *********/ - def passwordEncoderSecret = Option(getPassword(KafkaConfig.PasswordEncoderSecretProp)) - def passwordEncoderOldSecret = Option(getPassword(KafkaConfig.PasswordEncoderOldSecretProp)) - def passwordEncoderCipherAlgorithm = getString(KafkaConfig.PasswordEncoderCipherAlgorithmProp) - def passwordEncoderKeyFactoryAlgorithm = getString(KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp) - def passwordEncoderKeyLength = getInt(KafkaConfig.PasswordEncoderKeyLengthProp) - def passwordEncoderIterations = getInt(KafkaConfig.PasswordEncoderIterationsProp) + def passwordEncoderSecret = Option(getPassword(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG)) + def passwordEncoderOldSecret = Option(getPassword(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG)) + def passwordEncoderCipherAlgorithm = getString(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG) + def passwordEncoderKeyFactoryAlgorithm = getString(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG) + def passwordEncoderKeyLength = getInt(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG) + def passwordEncoderIterations = getInt(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG) /** ********* Quota Configuration **************/ val numQuotaSamples = getInt(KafkaConfig.NumQuotaSamplesProp) diff --git a/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala index abc8881f69968..c8c5450c1a214 100644 --- a/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala @@ -18,12 +18,13 @@ package kafka.admin import kafka.admin.ConfigCommand.ConfigCommandOptions import kafka.cluster.{Broker, EndPoint} -import kafka.server.{KafkaConfig, QuorumTestHarness} +import kafka.server.QuorumTestHarness import kafka.utils.{Exit, Logging} import kafka.zk.{AdminZkClient, BrokerInfo} import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.security.PasswordEncoderConfigs import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.config.ZooKeeperInternals import org.junit.jupiter.api.Assertions._ @@ -134,10 +135,10 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging { // Password config update with encoder secret should succeed and encoded password must be stored in ZK val configs = Map("listener.name.external.ssl.keystore.password" -> "secret", "log.cleaner.threads" -> "2") - val encoderConfigs = Map(KafkaConfig.PasswordEncoderSecretProp -> "encoder-secret") + val encoderConfigs = Map(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG -> "encoder-secret") alterConfigWithZk(configs, Some(brokerId), encoderConfigs) val brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId) - assertFalse(brokerConfigs.contains(KafkaConfig.PasswordEncoderSecretProp), "Encoder secret stored in ZooKeeper") + assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper") assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")) // not encoded val encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password") val passwordEncoder = ConfigCommand.createPasswordEncoder(encoderConfigs) @@ -146,11 +147,11 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging { // Password config update with overrides for encoder parameters val configs2 = Map("listener.name.internal.ssl.keystore.password" -> "secret2") - val encoderConfigs2 = Map(KafkaConfig.PasswordEncoderSecretProp -> "encoder-secret", - KafkaConfig.PasswordEncoderCipherAlgorithmProp -> "DES/CBC/PKCS5Padding", - KafkaConfig.PasswordEncoderIterationsProp -> "1024", - KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp -> "PBKDF2WithHmacSHA1", - KafkaConfig.PasswordEncoderKeyLengthProp -> "64") + val encoderConfigs2 = Map(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG -> "encoder-secret", + PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG -> "DES/CBC/PKCS5Padding", + PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG -> "1024", + PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG -> "PBKDF2WithHmacSHA1", + PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG -> "64") alterConfigWithZk(configs2, Some(brokerId), encoderConfigs2) val brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId) val encodedPassword2 = brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password") diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index e881d427c73a1..f588827a0ce2d 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -60,7 +60,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.ScramCredential import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} import org.apache.kafka.coordinator.transaction.TransactionLogConfigs -import org.apache.kafka.security.PasswordEncoder +import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs} import org.apache.kafka.server.config.{ConfigType, KafkaSecurityConfigs, ZkConfigs} import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs} import org.apache.kafka.server.metrics.KafkaYammerMetrics @@ -137,7 +137,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup props.put(KafkaSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(",")) props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "2000") // low value to test log rolling on config update props.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2") // greater than one to test reducing threads - props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret") + props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "dynamic-config-secret") props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 1680000000.toString) props.put(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, 168.toString) @@ -1117,7 +1117,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup val propsEncodedWithOldSecret = props.clone().asInstanceOf[Properties] val config = server.config val oldSecret = "old-dynamic-config-secret" - config.dynamicConfig.staticBrokerConfigs.put(KafkaConfig.PasswordEncoderOldSecretProp, oldSecret) + config.dynamicConfig.staticBrokerConfigs.put(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, oldSecret) val passwordConfigs = props.asScala.filter { case (k, _) => DynamicBrokerConfig.isPasswordConfig(k) } assertTrue(passwordConfigs.nonEmpty, "Password configs not found") val passwordDecoder = createPasswordEncoder(config, config.passwordEncoderSecret) @@ -1595,7 +1595,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup val externalListenerPrefix = listenerPrefix(SecureExternal) val sslStoreProps = new Properties sslStoreProps ++= securityProps(sslProperties, KEYSTORE_PROPS, externalListenerPrefix) - sslStoreProps.put(KafkaConfig.PasswordEncoderSecretProp, kafkaConfig.passwordEncoderSecret.map(_.value).orNull) + sslStoreProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, kafkaConfig.passwordEncoderSecret.map(_.value).orNull) zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) val entityType = ConfigType.BROKER @@ -1610,7 +1610,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup sslStoreProps.setProperty(configName, encodedValue) } } - sslStoreProps.remove(KafkaConfig.PasswordEncoderSecretProp) + sslStoreProps.remove(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG) adminZkClient.changeConfigs(entityType, entityName, sslStoreProps) val brokerProps = adminZkClient.fetchEntityConfig("brokers", kafkaConfig.brokerId.toString) diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 51a5bce13ca7a..ebf3e2328a473 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -33,6 +33,7 @@ import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.config.{ConfigException, SslConfigs} import org.apache.kafka.common.metrics.{JmxReporter, Metrics} import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.security.PasswordEncoderConfigs import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.config.{Defaults, KafkaSecurityConfigs, ZkConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig @@ -330,7 +331,7 @@ class DynamicBrokerConfigTest { private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean): Unit = { val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) - configProps.put(KafkaConfig.PasswordEncoderSecretProp, "broker.secret") + configProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "broker.secret") val config = KafkaConfig(configProps) config.dynamicConfig.initialize(None, None) @@ -381,7 +382,7 @@ class DynamicBrokerConfigTest { def testPasswordConfigEncryption(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) val configWithoutSecret = KafkaConfig(props) - props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret") + props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret") val configWithSecret = KafkaConfig(props) val dynamicProps = new Properties dynamicProps.put(KafkaSecurityConfigs.SASL_JAAS_CONFIG, "myLoginModule required;") @@ -402,7 +403,7 @@ class DynamicBrokerConfigTest { def testPasswordConfigEncoderSecretChange(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put(KafkaSecurityConfigs.SASL_JAAS_CONFIG, "staticLoginModule required;") - props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret") + props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret") val config = KafkaConfig(props) config.dynamicConfig.initialize(None, None) val dynamicProps = new Properties @@ -421,14 +422,14 @@ class DynamicBrokerConfigTest { assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaSecurityConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value) // New config with new secret should use the dynamic password config if new and old secrets are configured in KafkaConfig - props.put(KafkaConfig.PasswordEncoderSecretProp, "new-encoder-secret") - props.put(KafkaConfig.PasswordEncoderOldSecretProp, "config-encoder-secret") + props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "new-encoder-secret") + props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, "config-encoder-secret") val newConfigWithNewAndOldSecret = KafkaConfig(props) newConfigWithNewAndOldSecret.dynamicConfig.updateBrokerConfig(0, persistedProps) assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaSecurityConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value) // New config with new secret alone should revert to static password config since dynamic config cannot be decoded - props.put(KafkaConfig.PasswordEncoderSecretProp, "another-new-encoder-secret") + props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "another-new-encoder-secret") val newConfigWithNewSecret = KafkaConfig(props) newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps) assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(KafkaSecurityConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 173ad75d7d37a..7634694dc5eea 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -39,9 +39,10 @@ import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.security.PasswordEncoderConfigs import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1} -import org.apache.kafka.server.config.{KafkaSecurityConfigs, ServerTopicConfigSynonyms, ZkConfigs, ReplicationConfigs, ServerLogConfigs} +import org.apache.kafka.server.config.{KafkaSecurityConfigs, ReplicationConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZkConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig} import org.junit.jupiter.api.function.Executable @@ -980,12 +981,12 @@ class KafkaConfigTest { case KafkaSecurityConfigs.SECURITY_PROVIDER_CLASS_CONFIG => // Password encoder configs - case KafkaConfig.PasswordEncoderSecretProp => - case KafkaConfig.PasswordEncoderOldSecretProp => - case KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp => - case KafkaConfig.PasswordEncoderCipherAlgorithmProp => - case KafkaConfig.PasswordEncoderKeyLengthProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") - case KafkaConfig.PasswordEncoderIterationsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") + case PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG => + case PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG => + case PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG => + case PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG => + case PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") + case PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") //delegation token configs case KafkaConfig.DelegationTokenSecretKeyAliasProp => // ignore diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala index 3e4652c6180c4..5ffce65a01ecf 100644 --- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala @@ -20,7 +20,7 @@ import kafka.server.{KafkaConfig, QuorumTestHarness} import kafka.zk.ZkMigrationClient import org.apache.kafka.common.utils.Time import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState -import org.apache.kafka.security.PasswordEncoder +import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs} import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.{BeforeEach, TestInfo} @@ -42,7 +42,7 @@ class ZkMigrationTestHarness extends QuorumTestHarness { val encoder: PasswordEncoder = { val encoderProps = new Properties() encoderProps.put(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:1234") // Get around the config validation - encoderProps.put(KafkaConfig.PasswordEncoderSecretProp, SECRET) // Zk secret to encrypt the + encoderProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, SECRET) // Zk secret to encrypt the val encoderConfig = new KafkaConfig(encoderProps) PasswordEncoder.encrypting(encoderConfig.passwordEncoderSecret.get, encoderConfig.passwordEncoderKeyFactoryAlgorithm, diff --git a/server-common/src/main/java/org/apache/kafka/security/PasswordEncoderConfigs.java b/server-common/src/main/java/org/apache/kafka/security/PasswordEncoderConfigs.java index e5c9bc13fe7dd..f26d8919b1aaf 100644 --- a/server-common/src/main/java/org/apache/kafka/security/PasswordEncoderConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/security/PasswordEncoderConfigs.java @@ -18,13 +18,27 @@ public class PasswordEncoderConfigs { - public static final String SECRET = "password.encoder.secret"; - public static final String OLD_SECRET = "password.encoder.old.secret"; - public static final String KEYFACTORY_ALGORITHM = "password.encoder.keyfactory.algorithm"; - public static final String CIPHER_ALGORITHM = "password.encoder.cipher.algorithm"; - public static final String DEFAULT_CIPHER_ALGORITHM = "AES/CBC/PKCS5Padding"; - public static final String KEY_LENGTH = "password.encoder.key.length"; - public static final int DEFAULT_KEY_LENGTH = 128; - public static final String ITERATIONS = "password.encoder.iterations"; - public static final int DEFAULT_ITERATIONS = 4096; + public static final String PASSWORD_ENCODER_SECRET_CONFIG = "password.encoder.secret"; + public static final String PASSWORD_ENCODER_SECRET_DOC = "The secret used for encoding dynamically configured passwords for this broker."; + + public static final String PASSWORD_ENCODER_OLD_SECRET_CONFIG = "password.encoder.old.secret"; + public static final String PASSWORD_ENCODER_OLD_SECRET_DOC = "The old secret that was used for encoding dynamically configured passwords. " + + "This is required only when the secret is updated. If specified, all dynamically encoded passwords are " + + "decoded using this old secret and re-encoded using " + PASSWORD_ENCODER_SECRET_CONFIG + " when broker starts up."; + + public static final String PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG = "password.encoder.keyfactory.algorithm"; + public static final String PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_DOC = "The SecretKeyFactory algorithm used for encoding dynamically configured passwords. " + + "Default is PBKDF2WithHmacSHA512 if available and PBKDF2WithHmacSHA1 otherwise."; + + public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG = "password.encoder.cipher.algorithm"; + public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM_DOC = "The Cipher algorithm used for encoding dynamically configured passwords."; + public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT = "AES/CBC/PKCS5Padding"; + + public static final String PASSWORD_ENCODER_KEY_LENGTH_CONFIG = "password.encoder.key.length"; + public static final String PASSWORD_ENCODER_KEY_LENGTH_DOC = "The key length used for encoding dynamically configured passwords."; + public static final int PASSWORD_ENCODER_KEY_LENGTH_DEFAULT = 128; + + public static final String PASSWORD_ENCODER_ITERATIONS_CONFIG = "password.encoder.iterations"; + public static final String PASSWORD_ENCODER_ITERATIONS_DOC = "The iteration count used for encoding dynamically configured passwords."; + public static final int PASSWORD_ENCODER_ITERATIONS_DEFAULT = 4096; } diff --git a/server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java b/server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java index c61715b02cd1e..a513ed25c28cd 100644 --- a/server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java +++ b/server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java @@ -36,9 +36,9 @@ class PasswordEncoderTest { public void testEncodeDecode() throws GeneralSecurityException { PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"), null, - PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM, - PasswordEncoderConfigs.DEFAULT_KEY_LENGTH, - PasswordEncoderConfigs.DEFAULT_ITERATIONS); + PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, + PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT, + PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT); String password = "test-password"; String encoded = encoder.encode(new Password(password)); Map encodedMap = Csv.parseCsvMap(encoded); @@ -96,10 +96,10 @@ public void testEncodeDecodeAlgorithms() throws GeneralSecurityException { verifyEncodeDecode(null, "AES/CBC/PKCS5Padding", 128); verifyEncodeDecode(null, "AES/CFB/PKCS5Padding", 128); verifyEncodeDecode(null, "AES/OFB/PKCS5Padding", 128); - verifyEncodeDecode("PBKDF2WithHmacSHA1", PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM, 128); + verifyEncodeDecode("PBKDF2WithHmacSHA1", PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, 128); verifyEncodeDecode(null, "AES/GCM/NoPadding", 128); - verifyEncodeDecode("PBKDF2WithHmacSHA256", PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM, 128); - verifyEncodeDecode("PBKDF2WithHmacSHA512", PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM, 128); + verifyEncodeDecode("PBKDF2WithHmacSHA256", PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, 128); + verifyEncodeDecode("PBKDF2WithHmacSHA512", PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, 128); } private void verifyEncodeDecode(String keyFactoryAlg, String cipherAlg, int keyLength) throws GeneralSecurityException { @@ -107,7 +107,7 @@ private void verifyEncodeDecode(String keyFactoryAlg, String cipherAlg, int keyL keyFactoryAlg, cipherAlg, keyLength, - PasswordEncoderConfigs.DEFAULT_ITERATIONS); + PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT); String password = "test-password"; String encoded = encoder.encode(new Password(password)); verifyEncodedPassword(encoder, password, encoded); diff --git a/server/src/main/java/org/apache/kafka/server/config/Defaults.java b/server/src/main/java/org/apache/kafka/server/config/Defaults.java index 9337316f5eb08..70ea34b64efb5 100644 --- a/server/src/main/java/org/apache/kafka/server/config/Defaults.java +++ b/server/src/main/java/org/apache/kafka/server/config/Defaults.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.raft.RaftConfig; -import org.apache.kafka.security.PasswordEncoderConfigs; import java.util.Arrays; import java.util.List; @@ -119,10 +118,6 @@ public class Defaults { public static final long DELEGATION_TOKEN_EXPIRY_TIME_MS = 24 * 60 * 60 * 1000L; public static final long DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS = 1 * 60 * 60 * 1000L; - /** ********* Password Encryption Configuration for Dynamic Configs *********/ - public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM = PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM; - public static final int PASSWORD_ENCODER_KEY_LENGTH = PasswordEncoderConfigs.DEFAULT_KEY_LENGTH; - public static final int PASSWORD_ENCODER_ITERATIONS = PasswordEncoderConfigs.DEFAULT_ITERATIONS; /** ********* Raft Quorum Configuration *********/ public static final List QUORUM_VOTERS = RaftConfig.DEFAULT_QUORUM_VOTERS;