From 978e2dbb67cf91243e3cadf56458765ee2236ff6 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Sat, 23 Dec 2023 21:36:41 +0000 Subject: [PATCH 01/11] First commit Signed-off-by: dttung2905 --- pkg/scalers/kafka_scaler.go | 358 ++++++++++++++++++------------------ pkg/scalers/scaler_test.go | 2 +- 2 files changed, 178 insertions(+), 182 deletions(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 86acc802d81..4a7d69a9cf9 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -24,7 +24,7 @@ import ( "errors" "fmt" "os" - "strconv" + "reflect" "strings" "sync" @@ -158,23 +158,14 @@ func NewKafkaScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { func parseKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata) error { meta.saslType = KafkaSASLTypeNone - var saslAuthType string - switch { - case config.TriggerMetadata["sasl"] != "": - saslAuthType = config.TriggerMetadata["sasl"] - default: - saslAuthType = "" - } - if val, ok := config.AuthParams["sasl"]; ok { - if saslAuthType != "" { - return errors.New("unable to set `sasl` in both ScaledObject and TriggerAuthentication together") - } - saslAuthType = val + saslAuthType, err := getParameterFromConfigV2(config, "sasl", true, true, false, true, "", reflect.TypeOf("")) + if err != nil { + return err } if saslAuthType != "" { - saslAuthType = strings.TrimSpace(saslAuthType) - mode := kafkaSaslType(saslAuthType) + saslAuthType = strings.TrimSpace(saslAuthType.(string)) + mode := kafkaSaslType(saslAuthType.(string)) switch { case mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 || mode == KafkaSASLTypeOAuthbearer: @@ -194,30 +185,18 @@ func parseKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadat meta.enableTLS = false enableTLS := false - if val, ok := config.TriggerMetadata["tls"]; ok { - switch val { - case stringEnable: - enableTLS = true - case stringDisable: - enableTLS = false - default: - return fmt.Errorf("error incorrect TLS value given, got %s", val) - } - } - - if val, ok := config.AuthParams["tls"]; ok { - val = strings.TrimSpace(val) - if enableTLS { - return errors.New("unable to set `tls` in both ScaledObject and TriggerAuthentication together") - } - switch val { - case stringEnable: - enableTLS = true - case stringDisable: - enableTLS = false - default: - return fmt.Errorf("error incorrect TLS value given, got %s", val) - } + tlsString, err := getParameterFromConfigV2(config, "tls", true, false, false, true, "", reflect.TypeOf("")) + if err != nil { + return fmt.Errorf("error incorrect TLS value given. %s", err.Error()) + } + tlsString = strings.TrimSpace(tlsString.(string)) + switch tlsString.(string) { + case stringEnable: + enableTLS = true + case stringDisable: + enableTLS = false + default: + return fmt.Errorf("error incorrect TLS value given, got %s", tlsString) } if enableTLS { @@ -227,66 +206,96 @@ func parseKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadat return nil } -func parseTLS(config *scalersconfig.ScalerConfig, meta *kafkaMetadata) error { - certGiven := config.AuthParams["cert"] != "" - keyGiven := config.AuthParams["key"] != "" - if certGiven && !keyGiven { +func parseTLS(config *ScalerConfig, meta *kafkaMetadata) error { + certGiven, err := getParameterFromConfigV2(config, "cert", false, true, false, false, "", reflect.TypeOf("")) + if err != nil { + return err + } + keyGiven, err := getParameterFromConfigV2(config, "key", false, true, false, false, "", reflect.TypeOf("")) + if err != nil { + return err + } + + if certGiven != "" && keyGiven == "" { return errors.New("key must be provided with cert") } - if keyGiven && !certGiven { + if keyGiven != "" && certGiven == "" { return errors.New("cert must be provided with key") } - meta.ca = config.AuthParams["ca"] - meta.cert = config.AuthParams["cert"] - meta.key = config.AuthParams["key"] - meta.unsafeSsl = defaultUnsafeSsl - if val, ok := config.TriggerMetadata["unsafeSsl"]; ok { - unsafeSsl, err := strconv.ParseBool(val) - if err != nil { - return fmt.Errorf("error parsing unsafeSsl: %w", err) - } - meta.unsafeSsl = unsafeSsl + ca, err := getParameterFromConfigV2(config, "ca", false, true, false, false, "", reflect.TypeOf("")) + if err != nil { + return err + } + meta.ca = ca.(string) + cert, err := getParameterFromConfigV2(config, "cert", false, true, false, false, "", reflect.TypeOf("")) + if err != nil { + return err + } + meta.cert = cert.(string) + key, err := getParameterFromConfigV2(config, "key", false, true, false, false, "", reflect.TypeOf("")) + if err != nil { + return err } + meta.key = key.(string) - if value, found := config.AuthParams["keyPassword"]; found { - meta.keyPassword = value - } else { - meta.keyPassword = "" + unsafeSslRaw, err := getParameterFromConfigV2(config, "unsafeSsl", false, true, false, true, defaultUnsafeSsl, reflect.TypeOf(true)) + if err != nil { + return errors.New(fmt.Sprintf("error parsing unsafeSsl: %s", err.Error())) + } + meta.unsafeSsl = unsafeSslRaw.(bool) + + keyPassword, err := getParameterFromConfigV2(config, "keyPassword", false, true, false, true, "", reflect.TypeOf("")) + if err != nil { + return err } + meta.keyPassword = keyPassword.(string) + meta.enableTLS = true return nil } -func parseKerberosParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error { - if config.AuthParams["username"] == "" { - return errors.New("no username given") +func parseKerberosParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error { + username, err := getParameterFromConfigV2(config, "username", false, true, false, false, "", reflect.TypeOf("")) + if err != nil { + return errors.New(fmt.Sprintf("no username given. %s", err.Error())) } - meta.username = strings.TrimSpace(config.AuthParams["username"]) + meta.username = strings.TrimSpace(username.(string)) - if (config.AuthParams["password"] == "" && config.AuthParams["keytab"] == "") || - (config.AuthParams["password"] != "" && config.AuthParams["keytab"] != "") { + password, err := getParameterFromConfigV2(config, "password", false, true, false, true, "", reflect.TypeOf("")) + if err != nil { + return err + } + keytab, err := getParameterFromConfigV2(config, "keytab", false, true, false, true, "", reflect.TypeOf("")) + if err != nil { + return err + } + if (password == "" && keytab == "") || + (password != "" && keytab != "") { return errors.New("exactly one of 'password' or 'keytab' must be provided for GSSAPI authentication") } - if config.AuthParams["password"] != "" { - meta.password = strings.TrimSpace(config.AuthParams["password"]) + + if password != "" { + meta.password = strings.TrimSpace(password.(string)) } else { - path, err := saveToFile(config.AuthParams["keytab"]) + path, err := saveToFile(keytab.(string)) if err != nil { return fmt.Errorf("error saving keytab to file: %w", err) } meta.keytabPath = path } - if config.AuthParams["realm"] == "" { - return errors.New("no realm given") + realm, err := getParameterFromConfigV2(config, "realm", false, true, false, false, "", reflect.TypeOf("")) + if err != nil { + return errors.New(fmt.Sprintf("no realm given. %s", err.Error())) } - meta.realm = strings.TrimSpace(config.AuthParams["realm"]) + meta.realm = strings.TrimSpace(realm.(string)) - if config.AuthParams["kerberosConfig"] == "" { - return errors.New("no Kerberos configuration file (kerberosConfig) given") + kerberosConfig, err := getParameterFromConfigV2(config, "kerberosConfig", false, true, false, false, "", reflect.TypeOf("")) + if err != nil { + return errors.New(fmt.Sprintf("no Kerberos configuration file (kerberosConfig) given. %s", err.Error())) } - path, err := saveToFile(config.AuthParams["kerberosConfig"]) + path, err := saveToFile(kerberosConfig.(string)) if err != nil { return fmt.Errorf("error saving kerberosConfig to file: %w", err) } @@ -300,35 +309,42 @@ func parseKerberosParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata return nil } -func parseSaslParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error { - if config.AuthParams["username"] == "" { - return errors.New("no username given") +func parseSaslParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error { + username, err := getParameterFromConfigV2(config, "username", false, true, false, false, "", reflect.TypeOf("")) + if err != nil { + return errors.New(fmt.Sprintf("no username given. %s", err.Error())) } - meta.username = strings.TrimSpace(config.AuthParams["username"]) + meta.username = strings.TrimSpace(username.(string)) - if config.AuthParams["password"] == "" { - return errors.New("no password given") + password, err := getParameterFromConfigV2(config, "password", false, true, false, false, "", reflect.TypeOf("")) + if err != nil { + return errors.New(fmt.Sprintf("no password given. %s", err.Error())) } - meta.password = strings.TrimSpace(config.AuthParams["password"]) + meta.password = strings.TrimSpace(password.(string)) meta.saslType = mode if mode == KafkaSASLTypeOAuthbearer { - meta.scopes = strings.Split(config.AuthParams["scopes"], ",") + scopes, err := getParameterFromConfigV2(config, "scopes", false, true, false, false, "", reflect.TypeOf("")) + if err != nil { + return errors.New(fmt.Sprintf("no scopes given. %s", err.Error())) + } + meta.scopes = strings.Split(scopes.(string), ",") - if config.AuthParams["oauthTokenEndpointUri"] == "" { - return errors.New("no oauth token endpoint uri given") + oauthTokenEndpointsURI, err := getParameterFromConfigV2(config, "oauthTokenEndpointUri", false, true, false, false, "", reflect.TypeOf("")) + if err != nil { + return errors.New(fmt.Sprintf("no oauth token endpoint uri given. %s", err.Error())) } - meta.oauthTokenEndpointURI = strings.TrimSpace(config.AuthParams["oauthTokenEndpointUri"]) + meta.oauthTokenEndpointURI = strings.TrimSpace(oauthTokenEndpointsURI.(string)) meta.oauthExtensions = make(map[string]string) - oauthExtensionsRaw := config.AuthParams["oauthExtensions"] + oauthExtensionsRaw, _ := getParameterFromConfigV2(config, "oauthExtensions", false, true, false, true, "", reflect.TypeOf("")) if oauthExtensionsRaw != "" { - for _, extension := range strings.Split(oauthExtensionsRaw, ",") { - splittedExtension := strings.Split(extension, "=") - if len(splittedExtension) != 2 { + for _, extension := range strings.Split(oauthExtensionsRaw.(string), ",") { + splitExtension := strings.Split(extension, "=") + if len(splitExtension) != 2 { return errors.New("invalid OAuthBearer extension, must be of format key=value") } - meta.oauthExtensions[splittedExtension[0]] = splittedExtension[1] + meta.oauthExtensions[splitExtension[0]] = splitExtension[1] } } } @@ -364,42 +380,38 @@ func saveToFile(content string) (string, error) { func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (kafkaMetadata, error) { meta := kafkaMetadata{} - switch { - case config.TriggerMetadata["bootstrapServersFromEnv"] != "": - meta.bootstrapServers = strings.Split(config.ResolvedEnv[config.TriggerMetadata["bootstrapServersFromEnv"]], ",") - case config.TriggerMetadata["bootstrapServers"] != "": - meta.bootstrapServers = strings.Split(config.TriggerMetadata["bootstrapServers"], ",") - default: - return meta, errors.New("no bootstrapServers given") + bootstrapServers, err := getParameterFromConfigV2(config, "bootstrapServers", true, false, true, false, "", reflect.TypeOf("")) + if err != nil { + return meta, errors.New(fmt.Sprintf("no bootstrapServers given. %s", err.Error())) } + meta.bootstrapServers = strings.Split(bootstrapServers.(string), ",") - switch { - case config.TriggerMetadata["consumerGroupFromEnv"] != "": - meta.group = config.ResolvedEnv[config.TriggerMetadata["consumerGroupFromEnv"]] - case config.TriggerMetadata["consumerGroup"] != "": - meta.group = config.TriggerMetadata["consumerGroup"] - default: - return meta, errors.New("no consumer group given") + consumerGroup, err := getParameterFromConfigV2(config, "consumerGroup", true, false, true, false, "", reflect.TypeOf("")) + if err != nil { + return meta, errors.New(fmt.Sprintf("no consumer group given. %s", err.Error())) } + meta.group = consumerGroup.(string) - switch { - case config.TriggerMetadata["topicFromEnv"] != "": - meta.topic = config.ResolvedEnv[config.TriggerMetadata["topicFromEnv"]] - case config.TriggerMetadata["topic"] != "": - meta.topic = config.TriggerMetadata["topic"] - default: - meta.topic = "" + topic, err := getParameterFromConfigV2(config, "topic", true, false, true, true, "", reflect.TypeOf("")) + if err != nil { + return meta, err + } + if topic == "" { logger.V(1).Info(fmt.Sprintf("consumer group %q has no topic specified, "+ "will use all topics subscribed by the consumer group for scaling", meta.group)) } + meta.topic = topic.(string) meta.partitionLimitation = nil - partitionLimitationMetadata := strings.TrimSpace(config.TriggerMetadata["partitionLimitation"]) + partitionLimitationMetadata, err := getParameterFromConfigV2(config, "partitionLimitation", true, false, false, true, "", reflect.TypeOf("")) + if err != nil { + return meta, err + } if partitionLimitationMetadata != "" { if meta.topic == "" { logger.V(1).Info("no specific topic set, ignoring partitionLimitation setting") } else { - pattern := config.TriggerMetadata["partitionLimitation"] + pattern := strings.TrimSpace(partitionLimitationMetadata.(string)) parsed, err := kedautil.ParseInt32List(pattern) if err != nil { return meta, fmt.Errorf("error parsing in partitionLimitation '%s': %w", pattern, err) @@ -410,96 +422,80 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) } meta.offsetResetPolicy = defaultOffsetResetPolicy - - if config.TriggerMetadata["offsetResetPolicy"] != "" { - policy := offsetResetPolicy(config.TriggerMetadata["offsetResetPolicy"]) - if policy != earliest && policy != latest { - return meta, fmt.Errorf("err offsetResetPolicy policy %q given", policy) + offsetResetPolicyRaw, err := getParameterFromConfigV2(config, "offsetResetPolicy", true, false, false, true, "", reflect.TypeOf("")) + if err != nil { + return meta, err + } + if offsetResetPolicyRaw != "" { + if offsetResetPolicyRaw != earliest && offsetResetPolicyRaw != latest { + return meta, fmt.Errorf("err offsetResetPolicy policy %q given", offsetResetPolicyRaw) } - meta.offsetResetPolicy = policy + meta.offsetResetPolicy = offsetResetPolicy(offsetResetPolicyRaw.(string)) } - meta.lagThreshold = defaultKafkaLagThreshold - - if val, ok := config.TriggerMetadata[lagThresholdMetricName]; ok { - t, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return meta, fmt.Errorf("error parsing %q: %w", lagThresholdMetricName, err) - } - if t <= 0 { - return meta, fmt.Errorf("%q must be positive number", lagThresholdMetricName) - } - meta.lagThreshold = t + lagThreshold, err := getParameterFromConfigV2(config, lagThresholdMetricName, true, false, false, true, defaultKafkaLagThreshold, reflect.TypeOf(64)) + if err != nil { + return meta, err + } + if lagThreshold.(int) <= 0 { + return meta, fmt.Errorf("%q must be positive number", lagThresholdMetricName) } + meta.lagThreshold = int64(lagThreshold.(int)) meta.activationLagThreshold = defaultKafkaActivationLagThreshold - - if val, ok := config.TriggerMetadata[activationLagThresholdMetricName]; ok { - t, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return meta, fmt.Errorf("error parsing %q: %w", activationLagThresholdMetricName, err) - } - if t < 0 { - return meta, fmt.Errorf("%q must be positive number", activationLagThresholdMetricName) - } - meta.activationLagThreshold = t + activationLagThreshold, err := getParameterFromConfigV2(config, activationLagThresholdMetricName, true, false, false, true, defaultKafkaActivationLagThreshold, reflect.TypeOf(int64(64))) + if err != nil { + return meta, err + } + fmt.Println(activationLagThreshold) + if int64(activationLagThreshold.(int)) < 0 { + return meta, fmt.Errorf("%q must be positive number", activationLagThresholdMetricName) } + meta.activationLagThreshold = int64(activationLagThreshold.(int)) if err := parseKafkaAuthParams(config, &meta); err != nil { return meta, err } - meta.allowIdleConsumers = false - if val, ok := config.TriggerMetadata["allowIdleConsumers"]; ok { - t, err := strconv.ParseBool(val) - if err != nil { - return meta, fmt.Errorf("error parsing allowIdleConsumers: %w", err) - } - meta.allowIdleConsumers = t + allowIdConsumers, err := getParameterFromConfigV2(config, "allowIdleConsumers", true, false, false, true, false, reflect.TypeOf(true)) + if err != nil { + return meta, errors.New(fmt.Sprintf("error parsing allowIdleConsumers: %s", err.Error())) } + meta.allowIdleConsumers = allowIdConsumers.(bool) - meta.excludePersistentLag = false - if val, ok := config.TriggerMetadata["excludePersistentLag"]; ok { - t, err := strconv.ParseBool(val) - if err != nil { - return meta, fmt.Errorf("error parsing excludePersistentLag: %w", err) - } - meta.excludePersistentLag = t + excludePersistentLag, err := getParameterFromConfigV2(config, "excludePersistentLag", true, false, false, true, false, reflect.TypeOf(true)) + if err != nil { + return meta, errors.New(fmt.Sprintf("error parsing excludePersistentLag: %s", err.Error())) } + meta.excludePersistentLag = excludePersistentLag.(bool) - meta.scaleToZeroOnInvalidOffset = false - if val, ok := config.TriggerMetadata["scaleToZeroOnInvalidOffset"]; ok { - t, err := strconv.ParseBool(val) - if err != nil { - return meta, fmt.Errorf("error parsing scaleToZeroOnInvalidOffset: %w", err) - } - meta.scaleToZeroOnInvalidOffset = t + scaleToZeroOnInvalidOffset, err := getParameterFromConfigV2(config, "scaleToZeroOnInvalidOffset", true, false, false, true, false, reflect.TypeOf(true)) + if err != nil { + return meta, errors.New(fmt.Sprintf("error parsing scaleToZeroOnInvalidOffset: %s", err.Error())) } + meta.scaleToZeroOnInvalidOffset = scaleToZeroOnInvalidOffset.(bool) - meta.limitToPartitionsWithLag = false - if val, ok := config.TriggerMetadata["limitToPartitionsWithLag"]; ok { - t, err := strconv.ParseBool(val) - if err != nil { - return meta, fmt.Errorf("error parsing limitToPartitionsWithLag: %w", err) - } - meta.limitToPartitionsWithLag = t - - if meta.allowIdleConsumers && meta.limitToPartitionsWithLag { - return meta, fmt.Errorf("allowIdleConsumers and limitToPartitionsWithLag cannot be set simultaneously") - } - if len(meta.topic) == 0 && meta.limitToPartitionsWithLag { - return meta, fmt.Errorf("topic must be specified when using limitToPartitionsWithLag") - } + limitToPartitionsWithLag, err := getParameterFromConfigV2(config, "limitToPartitionsWithLag", true, false, false, true, false, reflect.TypeOf(true)) + if err != nil { + return meta, err } + meta.limitToPartitionsWithLag = limitToPartitionsWithLag.(bool) + if meta.allowIdleConsumers && meta.limitToPartitionsWithLag { + return meta, fmt.Errorf("allowIdleConsumers and limitToPartitionsWithLag cannot be set simultaneously") + } + if len(meta.topic) == 0 && meta.limitToPartitionsWithLag { + return meta, fmt.Errorf("topic must be specified when using limitToPartitionsWithLag") + } + saramaVer, err := getParameterFromConfigV2(config, "version", true, false, false, true, "", reflect.TypeOf("")) + if err != nil { + return meta, err + } meta.version = sarama.V1_0_0_0 - if val, ok := config.TriggerMetadata["version"]; ok { - val = strings.TrimSpace(val) - version, err := sarama.ParseKafkaVersion(val) - if err != nil { - return meta, fmt.Errorf("error parsing kafka version: %w", err) - } - meta.version = version + saramaVer = strings.TrimSpace(saramaVer.(string)) + version, err := sarama.ParseKafkaVersion(saramaVer.(string)) + if err != nil { + return meta, fmt.Errorf("error parsing kafka version: %w", err) } meta.triggerIndex = config.TriggerIndex return meta, nil diff --git a/pkg/scalers/scaler_test.go b/pkg/scalers/scaler_test.go index 59508b9ba58..ab7d2c193f2 100644 --- a/pkg/scalers/scaler_test.go +++ b/pkg/scalers/scaler_test.go @@ -240,7 +240,7 @@ var getParameterFromConfigTestDataset = []getParameterFromConfigTestData{ targetType: reflect.TypeOf(string("")), expectedResult: "default", // Should return empty string isError: true, - errorMessage: "key not found. Either set the correct key or set isOptional to true and set defaultVal", + errorMessage: "key key2 not found. Either set the correct key or set isOptional to true and set defaultVal", }, { name: "test_authParam_bool", From 2c9c21cc34e339c91f33e905feace6cd395594a1 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Thu, 28 Dec 2023 15:42:51 +0000 Subject: [PATCH 02/11] Rebase from master Signed-off-by: dttung2905 --- pkg/scalers/kafka_scaler.go | 42 +++++++++++++------------------- pkg/scalers/kafka_scaler_test.go | 22 ++++++++--------- pkg/scalers/scaler_test.go | 2 +- 3 files changed, 29 insertions(+), 37 deletions(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 4a7d69a9cf9..290147ecb53 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -184,8 +184,8 @@ func parseKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadat } meta.enableTLS = false - enableTLS := false - tlsString, err := getParameterFromConfigV2(config, "tls", true, false, false, true, "", reflect.TypeOf("")) + var enableTLS bool + tlsString, err := getParameterFromConfigV2(config, "tls", true, true, false, true, "disable", reflect.TypeOf("")) if err != nil { return fmt.Errorf("error incorrect TLS value given. %s", err.Error()) } @@ -207,14 +207,17 @@ func parseKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadat } func parseTLS(config *ScalerConfig, meta *kafkaMetadata) error { - certGiven, err := getParameterFromConfigV2(config, "cert", false, true, false, false, "", reflect.TypeOf("")) + certGiven, err := getParameterFromConfigV2(config, "cert", false, true, false, true, "", reflect.TypeOf("")) if err != nil { return err } - keyGiven, err := getParameterFromConfigV2(config, "key", false, true, false, false, "", reflect.TypeOf("")) + meta.cert = certGiven.(string) + + keyGiven, err := getParameterFromConfigV2(config, "key", false, true, false, true, "", reflect.TypeOf("")) if err != nil { return err } + meta.key = keyGiven.(string) if certGiven != "" && keyGiven == "" { return errors.New("key must be provided with cert") @@ -223,23 +226,13 @@ func parseTLS(config *ScalerConfig, meta *kafkaMetadata) error { return errors.New("cert must be provided with key") } - ca, err := getParameterFromConfigV2(config, "ca", false, true, false, false, "", reflect.TypeOf("")) + ca, err := getParameterFromConfigV2(config, "ca", false, true, false, true, "", reflect.TypeOf("")) if err != nil { return err } meta.ca = ca.(string) - cert, err := getParameterFromConfigV2(config, "cert", false, true, false, false, "", reflect.TypeOf("")) - if err != nil { - return err - } - meta.cert = cert.(string) - key, err := getParameterFromConfigV2(config, "key", false, true, false, false, "", reflect.TypeOf("")) - if err != nil { - return err - } - meta.key = key.(string) - unsafeSslRaw, err := getParameterFromConfigV2(config, "unsafeSsl", false, true, false, true, defaultUnsafeSsl, reflect.TypeOf(true)) + unsafeSslRaw, err := getParameterFromConfigV2(config, "unsafeSsl", true, false, false, true, defaultUnsafeSsl, reflect.TypeOf(true)) if err != nil { return errors.New(fmt.Sprintf("error parsing unsafeSsl: %s", err.Error())) } @@ -324,7 +317,7 @@ func parseSaslParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslTy meta.saslType = mode if mode == KafkaSASLTypeOAuthbearer { - scopes, err := getParameterFromConfigV2(config, "scopes", false, true, false, false, "", reflect.TypeOf("")) + scopes, err := getParameterFromConfigV2(config, "scopes", false, true, false, true, "", reflect.TypeOf("")) if err != nil { return errors.New(fmt.Sprintf("no scopes given. %s", err.Error())) } @@ -427,10 +420,11 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) return meta, err } if offsetResetPolicyRaw != "" { - if offsetResetPolicyRaw != earliest && offsetResetPolicyRaw != latest { + policy := offsetResetPolicy(offsetResetPolicyRaw.(string)) + if policy != earliest && policy != latest { return meta, fmt.Errorf("err offsetResetPolicy policy %q given", offsetResetPolicyRaw) } - meta.offsetResetPolicy = offsetResetPolicy(offsetResetPolicyRaw.(string)) + meta.offsetResetPolicy = policy } lagThreshold, err := getParameterFromConfigV2(config, lagThresholdMetricName, true, false, false, true, defaultKafkaLagThreshold, reflect.TypeOf(64)) @@ -443,15 +437,14 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) meta.lagThreshold = int64(lagThreshold.(int)) meta.activationLagThreshold = defaultKafkaActivationLagThreshold - activationLagThreshold, err := getParameterFromConfigV2(config, activationLagThresholdMetricName, true, false, false, true, defaultKafkaActivationLagThreshold, reflect.TypeOf(int64(64))) + activationLagThreshold, err := getParameterFromConfigV2(config, activationLagThresholdMetricName, true, false, false, true, int64(defaultKafkaActivationLagThreshold), reflect.TypeOf(int64(64))) if err != nil { return meta, err } - fmt.Println(activationLagThreshold) - if int64(activationLagThreshold.(int)) < 0 { + if activationLagThreshold.(int64) < 0 { return meta, fmt.Errorf("%q must be positive number", activationLagThresholdMetricName) } - meta.activationLagThreshold = int64(activationLagThreshold.(int)) + meta.activationLagThreshold = activationLagThreshold.(int64) if err := parseKafkaAuthParams(config, &meta); err != nil { return meta, err @@ -487,11 +480,10 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) if len(meta.topic) == 0 && meta.limitToPartitionsWithLag { return meta, fmt.Errorf("topic must be specified when using limitToPartitionsWithLag") } - saramaVer, err := getParameterFromConfigV2(config, "version", true, false, false, true, "", reflect.TypeOf("")) + saramaVer, err := getParameterFromConfigV2(config, "version", true, false, false, true, "1.0.0", reflect.TypeOf("")) if err != nil { return meta, err } - meta.version = sarama.V1_0_0_0 saramaVer = strings.TrimSpace(saramaVer.(string)) version, err := sarama.ParseKafkaVersion(saramaVer.(string)) if err != nil { diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 81ebc746443..fd59e80de27 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -321,8 +321,8 @@ var kafkaMetricIdentifiers = []kafkaMetricIdentifier{ } func TestGetBrokers(t *testing.T) { - for _, testData := range parseKafkaMetadataTestDataset { - meta, err := parseKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validWithAuthParams}, logr.Discard()) + for idx, testData := range parseKafkaMetadataTestDataset { + meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validWithAuthParams}, logr.Discard()) getBrokerTestBase(t, meta, testData, err) meta, err = parseKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validWithoutAuthParams}, logr.Discard()) @@ -375,30 +375,30 @@ func getBrokerTestBase(t *testing.T, meta kafkaMetadata, testData parseKafkaMeta } func TestKafkaAuthParamsInTriggerAuthentication(t *testing.T) { - for _, testData := range parseKafkaAuthParamsTestDataset { - meta, err := parseKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: validKafkaMetadata, AuthParams: testData.authParams}, logr.Discard()) + for idx, testData := range parseKafkaAuthParamsTestDataset { + meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: validKafkaMetadata, AuthParams: testData.authParams}, logr.Discard()) if err != nil && !testData.isError { - t.Error("Expected success but got error", err) + t.Errorf("Test %v: expected success but got error %v", idx, err) } if testData.isError && err == nil { - t.Error("Expected error but got success") + t.Errorf("Test %v: expected error but got success", idx) } if meta.enableTLS != testData.enableTLS { - t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.enableTLS) + t.Errorf("Test %v: expected enableTLS to be set to %v but got %v\n", idx, testData.enableTLS, meta.enableTLS) } if meta.enableTLS { if meta.ca != testData.authParams["ca"] { - t.Errorf("Expected ca to be set to %v but got %v\n", testData.authParams["ca"], meta.enableTLS) + t.Errorf("Test %v: expected ca to be set to %v but got %v\n", idx, testData.authParams["ca"], meta.enableTLS) } if meta.cert != testData.authParams["cert"] { - t.Errorf("Expected cert to be set to %v but got %v\n", testData.authParams["cert"], meta.cert) + t.Errorf("Test %v: expected cert to be set to %v but got %v\n", idx, testData.authParams["cert"], meta.cert) } if meta.key != testData.authParams["key"] { - t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.key) + t.Errorf("Test %v: expected key to be set to %v but got %v\n", idx, testData.authParams["key"], meta.key) } if meta.keyPassword != testData.authParams["keyPassword"] { - t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["keyPassword"], meta.key) + t.Errorf("Test %v: expected key to be set to %v but got %v\n", idx, testData.authParams["keyPassword"], meta.key) } } if meta.saslType == KafkaSASLTypeGSSAPI && !testData.isError { diff --git a/pkg/scalers/scaler_test.go b/pkg/scalers/scaler_test.go index ab7d2c193f2..59508b9ba58 100644 --- a/pkg/scalers/scaler_test.go +++ b/pkg/scalers/scaler_test.go @@ -240,7 +240,7 @@ var getParameterFromConfigTestDataset = []getParameterFromConfigTestData{ targetType: reflect.TypeOf(string("")), expectedResult: "default", // Should return empty string isError: true, - errorMessage: "key key2 not found. Either set the correct key or set isOptional to true and set defaultVal", + errorMessage: "key not found. Either set the correct key or set isOptional to true and set defaultVal", }, { name: "test_authParam_bool", From b088676897922f1d73b86504cb3ed1ed2b82f54e Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Thu, 28 Dec 2023 17:43:42 +0000 Subject: [PATCH 03/11] Fix Semgrep Signed-off-by: dttung2905 --- pkg/scalers/kafka_scaler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 290147ecb53..53ee4a91a18 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -187,7 +187,7 @@ func parseKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadat var enableTLS bool tlsString, err := getParameterFromConfigV2(config, "tls", true, true, false, true, "disable", reflect.TypeOf("")) if err != nil { - return fmt.Errorf("error incorrect TLS value given. %s", err.Error()) + return fmt.Errorf("error incorrect TLS value given. %w", err) } tlsString = strings.TrimSpace(tlsString.(string)) switch tlsString.(string) { From 51ad4a8c355b928de43670bb16b2e1f752ee42d8 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Thu, 28 Dec 2023 19:39:06 +0000 Subject: [PATCH 04/11] Fix static check CI failure Signed-off-by: dttung2905 --- pkg/scalers/kafka_scaler.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 53ee4a91a18..f50e14a714d 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -234,7 +234,7 @@ func parseTLS(config *ScalerConfig, meta *kafkaMetadata) error { unsafeSslRaw, err := getParameterFromConfigV2(config, "unsafeSsl", true, false, false, true, defaultUnsafeSsl, reflect.TypeOf(true)) if err != nil { - return errors.New(fmt.Sprintf("error parsing unsafeSsl: %s", err.Error())) + return fmt.Errorf("error parsing unsafeSsl: %w", err) } meta.unsafeSsl = unsafeSslRaw.(bool) @@ -251,7 +251,7 @@ func parseTLS(config *ScalerConfig, meta *kafkaMetadata) error { func parseKerberosParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error { username, err := getParameterFromConfigV2(config, "username", false, true, false, false, "", reflect.TypeOf("")) if err != nil { - return errors.New(fmt.Sprintf("no username given. %s", err.Error())) + return fmt.Errorf("no username given. %w", err) } meta.username = strings.TrimSpace(username.(string)) @@ -280,13 +280,13 @@ func parseKerberosParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSa realm, err := getParameterFromConfigV2(config, "realm", false, true, false, false, "", reflect.TypeOf("")) if err != nil { - return errors.New(fmt.Sprintf("no realm given. %s", err.Error())) + return fmt.Errorf("no realm given. %w", err) } meta.realm = strings.TrimSpace(realm.(string)) kerberosConfig, err := getParameterFromConfigV2(config, "kerberosConfig", false, true, false, false, "", reflect.TypeOf("")) if err != nil { - return errors.New(fmt.Sprintf("no Kerberos configuration file (kerberosConfig) given. %s", err.Error())) + return fmt.Errorf("no Kerberos configuration file (kerberosConfig) given. %w", err) } path, err := saveToFile(kerberosConfig.(string)) if err != nil { @@ -305,13 +305,13 @@ func parseKerberosParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSa func parseSaslParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error { username, err := getParameterFromConfigV2(config, "username", false, true, false, false, "", reflect.TypeOf("")) if err != nil { - return errors.New(fmt.Sprintf("no username given. %s", err.Error())) + return fmt.Errorf("no username given. %w", err) } meta.username = strings.TrimSpace(username.(string)) password, err := getParameterFromConfigV2(config, "password", false, true, false, false, "", reflect.TypeOf("")) if err != nil { - return errors.New(fmt.Sprintf("no password given. %s", err.Error())) + return fmt.Errorf("no password given. %w", err) } meta.password = strings.TrimSpace(password.(string)) meta.saslType = mode @@ -319,13 +319,13 @@ func parseSaslParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslTy if mode == KafkaSASLTypeOAuthbearer { scopes, err := getParameterFromConfigV2(config, "scopes", false, true, false, true, "", reflect.TypeOf("")) if err != nil { - return errors.New(fmt.Sprintf("no scopes given. %s", err.Error())) + return fmt.Errorf("no scopes given. %w", err) } meta.scopes = strings.Split(scopes.(string), ",") oauthTokenEndpointsURI, err := getParameterFromConfigV2(config, "oauthTokenEndpointUri", false, true, false, false, "", reflect.TypeOf("")) if err != nil { - return errors.New(fmt.Sprintf("no oauth token endpoint uri given. %s", err.Error())) + return fmt.Errorf("no oauth token endpoint uri given. %w", err) } meta.oauthTokenEndpointURI = strings.TrimSpace(oauthTokenEndpointsURI.(string)) @@ -375,13 +375,13 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) meta := kafkaMetadata{} bootstrapServers, err := getParameterFromConfigV2(config, "bootstrapServers", true, false, true, false, "", reflect.TypeOf("")) if err != nil { - return meta, errors.New(fmt.Sprintf("no bootstrapServers given. %s", err.Error())) + return meta, fmt.Errorf("no bootstrapServers given. %w", err) } meta.bootstrapServers = strings.Split(bootstrapServers.(string), ",") consumerGroup, err := getParameterFromConfigV2(config, "consumerGroup", true, false, true, false, "", reflect.TypeOf("")) if err != nil { - return meta, errors.New(fmt.Sprintf("no consumer group given. %s", err.Error())) + return meta, fmt.Errorf("no consumer group given. %w", err) } meta.group = consumerGroup.(string) @@ -450,21 +450,21 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) return meta, err } - allowIdConsumers, err := getParameterFromConfigV2(config, "allowIdleConsumers", true, false, false, true, false, reflect.TypeOf(true)) + allowIDConsumers, err := getParameterFromConfigV2(config, "allowIdleConsumers", true, false, false, true, false, reflect.TypeOf(true)) if err != nil { - return meta, errors.New(fmt.Sprintf("error parsing allowIdleConsumers: %s", err.Error())) + return meta, fmt.Errorf("error parsing allowIdleConsumers: %w", err) } - meta.allowIdleConsumers = allowIdConsumers.(bool) + meta.allowIdleConsumers = allowIDConsumers.(bool) excludePersistentLag, err := getParameterFromConfigV2(config, "excludePersistentLag", true, false, false, true, false, reflect.TypeOf(true)) if err != nil { - return meta, errors.New(fmt.Sprintf("error parsing excludePersistentLag: %s", err.Error())) + return meta, fmt.Errorf("error parsing excludePersistentLag: %w", err) } meta.excludePersistentLag = excludePersistentLag.(bool) scaleToZeroOnInvalidOffset, err := getParameterFromConfigV2(config, "scaleToZeroOnInvalidOffset", true, false, false, true, false, reflect.TypeOf(true)) if err != nil { - return meta, errors.New(fmt.Sprintf("error parsing scaleToZeroOnInvalidOffset: %s", err.Error())) + return meta, fmt.Errorf("error parsing scaleToZeroOnInvalidOffset: %w", err) } meta.scaleToZeroOnInvalidOffset = scaleToZeroOnInvalidOffset.(bool) From 83416e25424241145a32b4a4f5784b4205db694e Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Tue, 2 Jan 2024 22:07:29 +0000 Subject: [PATCH 05/11] Add apache_kafka_scaler Signed-off-by: dttung2905 --- pkg/scalers/apache_kafka_scaler.go | 190 ++++++++++++++--------------- 1 file changed, 91 insertions(+), 99 deletions(-) diff --git a/pkg/scalers/apache_kafka_scaler.go b/pkg/scalers/apache_kafka_scaler.go index bc13daf3fb9..a4046b9de3e 100644 --- a/pkg/scalers/apache_kafka_scaler.go +++ b/pkg/scalers/apache_kafka_scaler.go @@ -23,6 +23,7 @@ import ( "crypto/tls" "errors" "fmt" + "reflect" "strconv" "strings" @@ -120,83 +121,75 @@ func NewApacheKafkaScaler(ctx context.Context, config *scalersconfig.ScalerConfi func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apacheKafkaMetadata) error { meta.enableTLS = false - enableTLS := false - if val, ok := config.TriggerMetadata["tls"]; ok { - switch val { - case stringEnable: - enableTLS = true - case stringDisable: - enableTLS = false - default: - return fmt.Errorf("error incorrect TLS value given, got %s", val) - } + var enableTLS bool + tlsString, err := getParameterFromConfigV2(config, "tls", true, true, false, true, "disable", reflect.TypeOf("")) + if err != nil { + return fmt.Errorf("error incorrect TLS value given. %w", err) + } + tlsString = strings.TrimSpace(tlsString.(string)) + switch tlsString.(string) { + case stringEnable: + enableTLS = true + case stringDisable: + enableTLS = false + default: + return fmt.Errorf("error incorrect TLS value given, got %s", tlsString.(string)) } - if val, ok := config.AuthParams["tls"]; ok { - val = strings.TrimSpace(val) - if enableTLS { - return errors.New("unable to set `tls` in both ScaledObject and TriggerAuthentication together") + if enableTLS { + certGiven, err := getParameterFromConfigV2(config, "cert", false, true, false, true, "", reflect.TypeOf("")) + if err != nil { + return err } - switch val { - case stringEnable: - enableTLS = true - case stringDisable: - enableTLS = false - default: - return fmt.Errorf("error incorrect TLS value given, got %s", val) + keyGiven, err := getParameterFromConfigV2(config, "key", false, true, false, true, "", reflect.TypeOf("")) + if err != nil { + return err } - } - - if enableTLS { - certGiven := config.AuthParams["cert"] != "" - keyGiven := config.AuthParams["key"] != "" - if certGiven && !keyGiven { + if certGiven == "" && keyGiven != "" { return errors.New("key must be provided with cert") } - if keyGiven && !certGiven { + if keyGiven == "" && certGiven != "" { return errors.New("cert must be provided with key") } - meta.ca = config.AuthParams["ca"] - meta.cert = config.AuthParams["cert"] - meta.key = config.AuthParams["key"] - if value, found := config.AuthParams["keyPassword"]; found { - meta.keyPassword = value - } else { - meta.keyPassword = "" + ca, err := getParameterFromConfigV2(config, "ca", false, true, false, true, "", reflect.TypeOf("")) + if err != nil { + return err } + meta.ca = ca.(string) + meta.cert = certGiven.(string) + meta.key = keyGiven.(string) + keyPassword, err := getParameterFromConfigV2(config, "keyPassword", false, true, false, true, "", reflect.TypeOf("")) + if err != nil { + return err + } + meta.keyPassword = keyPassword.(string) meta.enableTLS = true } meta.saslType = KafkaSASLTypeNone - var saslAuthType string - switch { - case config.TriggerMetadata["sasl"] != "": - saslAuthType = config.TriggerMetadata["sasl"] - default: - saslAuthType = "" - } - if val, ok := config.AuthParams["sasl"]; ok { - if saslAuthType != "" { - return errors.New("unable to set `sasl` in both ScaledObject and TriggerAuthentication together") - } - saslAuthType = val + saslAuthType, err := getParameterFromConfigV2(config, "sasl", true, true, false, true, "", reflect.TypeOf("")) + if err != nil { + return err } if saslAuthType != "" { - saslAuthType = strings.TrimSpace(saslAuthType) - switch mode := kafkaSaslType(saslAuthType); mode { + saslAuthType = strings.TrimSpace(saslAuthType.(string)) + switch mode := kafkaSaslType(saslAuthType.(string)); mode { case KafkaSASLTypeMskIam: meta.saslType = mode - if val, ok := config.TriggerMetadata["awsEndpoint"]; ok { - meta.awsEndpoint = val + awsEndpoint, err := getParameterFromConfigV2(config, "awsEndpoint", true, false, false, true, "", reflect.TypeOf("")) + if err != nil { + return err + } + if awsEndpoint != "" { + meta.awsEndpoint = awsEndpoint.(string) } if !meta.enableTLS { return errors.New("TLS is required for MSK") } - if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" { - meta.awsRegion = val - } else { - return errors.New("no awsRegion given") + awsRegion, err := getParameterFromConfigV2(config, "awsRegion", true, false, false, false, "", reflect.TypeOf("")) + if err != nil { + return fmt.Errorf("%w. No awsRegion given", err) } auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv) if err != nil { @@ -208,16 +201,17 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache case KafkaSASLTypeSCRAMSHA256: fallthrough case KafkaSASLTypeSCRAMSHA512: - if val, ok := config.AuthParams["username"]; ok { - meta.username = strings.TrimSpace(val) - } else { - return errors.New("no username given") + username, err := getParameterFromConfigV2(config, "username", false, true, false, false, "", reflect.TypeOf("")) + if err != nil { + return fmt.Errorf("%w. No username given", err) } - if val, ok := config.AuthParams["password"]; ok { - meta.password = strings.TrimSpace(val) - } else { - return errors.New("no password given") + meta.username = strings.TrimSpace(username.(string)) + + password, err := getParameterFromConfigV2(config, "password", false, true, false, false, "", reflect.TypeOf("")) + if err != nil { + return fmt.Errorf("%w. No password given", err) } + meta.password = strings.TrimSpace(password.(string)) case KafkaSASLTypeOAuthbearer: return errors.New("SASL/OAUTHBEARER is not implemented yet") default: @@ -230,42 +224,40 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (apacheKafkaMetadata, error) { meta := apacheKafkaMetadata{} - switch { - case config.TriggerMetadata["bootstrapServersFromEnv"] != "": - meta.bootstrapServers = strings.Split(config.ResolvedEnv[config.TriggerMetadata["bootstrapServersFromEnv"]], ",") - case config.TriggerMetadata["bootstrapServers"] != "": - meta.bootstrapServers = strings.Split(config.TriggerMetadata["bootstrapServers"], ",") - default: - return meta, errors.New("no bootstrapServers given") + bootstrapServers, err := getParameterFromConfigV2(config, "bootstrapServers", true, false, true, false, "", reflect.TypeOf("")) + if err != nil { + return meta, fmt.Errorf("no bootstrapServers given. %w", err) } + meta.bootstrapServers = strings.Split(bootstrapServers.(string), ",") - switch { - case config.TriggerMetadata["consumerGroupFromEnv"] != "": - meta.group = config.ResolvedEnv[config.TriggerMetadata["consumerGroupFromEnv"]] - case config.TriggerMetadata["consumerGroup"] != "": - meta.group = config.TriggerMetadata["consumerGroup"] - default: - return meta, errors.New("no consumer group given") + consumerGroup, err := getParameterFromConfigV2(config, "consumerGroup", true, false, true, false, "", reflect.TypeOf("")) + if err != nil { + return meta, fmt.Errorf("no consumer group given. %w", err) } + meta.group = consumerGroup.(string) - switch { - case config.TriggerMetadata["topicFromEnv"] != "": - meta.topic = strings.Split(config.ResolvedEnv[config.TriggerMetadata["topicFromEnv"]], ",") - case config.TriggerMetadata["topic"] != "": - meta.topic = strings.Split(config.TriggerMetadata["topic"], ",") - default: + topic, err := getParameterFromConfigV2(config, "topic", true, false, true, true, "", reflect.TypeOf("")) + if err != nil { + return meta, err + } + if topic == "" { meta.topic = []string{} logger.V(1).Info(fmt.Sprintf("consumer group %q has no topics specified, "+ "will use all topics subscribed by the consumer group for scaling", meta.group)) + } else { + meta.topic = strings.Split(topic.(string), ",") } meta.partitionLimitation = nil - partitionLimitationMetadata := strings.TrimSpace(config.TriggerMetadata["partitionLimitation"]) + partitionLimitationMetadata, err := getParameterFromConfigV2(config, "partitionLimitation", true, false, false, true, "", reflect.TypeOf("")) + if err != nil { + return meta, err + } if partitionLimitationMetadata != "" { if meta.topic == nil || len(meta.topic) == 0 { logger.V(1).Info("no specific topics set, ignoring partitionLimitation setting") } else { - pattern := config.TriggerMetadata["partitionLimitation"] + pattern := strings.TrimSpace(partitionLimitationMetadata.(string)) parsed, err := kedautil.ParseInt32List(pattern) if err != nil { return meta, fmt.Errorf("error parsing in partitionLimitation '%s': %w", pattern, err) @@ -276,27 +268,27 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo } meta.offsetResetPolicy = defaultOffsetResetPolicy - - if config.TriggerMetadata["offsetResetPolicy"] != "" { - policy := offsetResetPolicy(config.TriggerMetadata["offsetResetPolicy"]) + offsetResetPolicyRaw, err := getParameterFromConfigV2(config, "offsetResetPolicy", true, false, false, true, "", reflect.TypeOf("")) + if err != nil { + return meta, err + } + if offsetResetPolicyRaw != "" { + policy := offsetResetPolicy(offsetResetPolicyRaw.(string)) if policy != earliest && policy != latest { - return meta, fmt.Errorf("err offsetResetPolicy policy %q given", policy) + return meta, fmt.Errorf("err offsetResetPolicy policy %q given", offsetResetPolicyRaw) } meta.offsetResetPolicy = policy } meta.lagThreshold = defaultKafkaLagThreshold - - if val, ok := config.TriggerMetadata[lagThresholdMetricName]; ok { - t, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return meta, fmt.Errorf("error parsing %q: %w", lagThresholdMetricName, err) - } - if t <= 0 { - return meta, fmt.Errorf("%q must be positive number", lagThresholdMetricName) - } - meta.lagThreshold = t + lagThreshold, err := getParameterFromConfigV2(config, lagThresholdMetricName, true, false, false, true, defaultKafkaLagThreshold, reflect.TypeOf(64)) + if err != nil { + return meta, err + } + if lagThreshold.(int) <= 0 { + return meta, fmt.Errorf("%q must be positive number", lagThresholdMetricName) } + meta.lagThreshold = int64(lagThreshold.(int)) meta.activationLagThreshold = defaultKafkaActivationLagThreshold From 126de40ad0fb532b9cfc61f46d7a87c2a9f92d45 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Thu, 4 Jan 2024 22:27:39 +0000 Subject: [PATCH 06/11] Finish apache kafka scaler Signed-off-by: dttung2905 --- pkg/scalers/apache_kafka_scaler.go | 74 +++++++++++------------------- 1 file changed, 28 insertions(+), 46 deletions(-) diff --git a/pkg/scalers/apache_kafka_scaler.go b/pkg/scalers/apache_kafka_scaler.go index a4046b9de3e..3ae87a9f97e 100644 --- a/pkg/scalers/apache_kafka_scaler.go +++ b/pkg/scalers/apache_kafka_scaler.go @@ -24,7 +24,6 @@ import ( "errors" "fmt" "reflect" - "strconv" "strings" "github.com/go-logr/logr" @@ -291,63 +290,46 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo meta.lagThreshold = int64(lagThreshold.(int)) meta.activationLagThreshold = defaultKafkaActivationLagThreshold - - if val, ok := config.TriggerMetadata[activationLagThresholdMetricName]; ok { - t, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return meta, fmt.Errorf("error parsing %q: %w", activationLagThresholdMetricName, err) - } - if t < 0 { - return meta, fmt.Errorf("%q must be positive number", activationLagThresholdMetricName) - } - meta.activationLagThreshold = t + activationLagThreshold, err := getParameterFromConfigV2(config, activationLagThresholdMetricName, true, false, false, true, int64(defaultKafkaActivationLagThreshold), reflect.TypeOf(int64(64))) + if err != nil { + return meta, err + } + if activationLagThreshold.(int64) < 0 { + return meta, fmt.Errorf("%q must be positive number", activationLagThresholdMetricName) } if err := parseApacheKafkaAuthParams(config, &meta); err != nil { return meta, err } - meta.allowIdleConsumers = false - if val, ok := config.TriggerMetadata["allowIdleConsumers"]; ok { - t, err := strconv.ParseBool(val) - if err != nil { - return meta, fmt.Errorf("error parsing allowIdleConsumers: %w", err) - } - meta.allowIdleConsumers = t + allowIDConsumers, err := getParameterFromConfigV2(config, "allowIdleConsumers", true, false, false, true, false, reflect.TypeOf(true)) + if err != nil { + return meta, fmt.Errorf("error parsing allowIdleConsumers: %w", err) } + meta.allowIdleConsumers = allowIDConsumers.(bool) - meta.excludePersistentLag = false - if val, ok := config.TriggerMetadata["excludePersistentLag"]; ok { - t, err := strconv.ParseBool(val) - if err != nil { - return meta, fmt.Errorf("error parsing excludePersistentLag: %w", err) - } - meta.excludePersistentLag = t + excludePersistentLag, err := getParameterFromConfigV2(config, "excludePersistentLag", true, false, false, true, false, reflect.TypeOf(true)) + if err != nil { + return meta, fmt.Errorf("error parsing excludePersistentLag: %w", err) } + meta.excludePersistentLag = excludePersistentLag.(bool) - meta.scaleToZeroOnInvalidOffset = false - if val, ok := config.TriggerMetadata["scaleToZeroOnInvalidOffset"]; ok { - t, err := strconv.ParseBool(val) - if err != nil { - return meta, fmt.Errorf("error parsing scaleToZeroOnInvalidOffset: %w", err) - } - meta.scaleToZeroOnInvalidOffset = t + scaleToZeroOnInvalidOffset, err := getParameterFromConfigV2(config, "scaleToZeroOnInvalidOffset", true, false, false, true, false, reflect.TypeOf(true)) + if err != nil { + return meta, fmt.Errorf("error parsing scaleToZeroOnInvalidOffset: %w", err) } + meta.scaleToZeroOnInvalidOffset = scaleToZeroOnInvalidOffset.(bool) - meta.limitToPartitionsWithLag = false - if val, ok := config.TriggerMetadata["limitToPartitionsWithLag"]; ok { - t, err := strconv.ParseBool(val) - if err != nil { - return meta, fmt.Errorf("error parsing limitToPartitionsWithLag: %w", err) - } - meta.limitToPartitionsWithLag = t - - if meta.allowIdleConsumers && meta.limitToPartitionsWithLag { - return meta, fmt.Errorf("allowIdleConsumers and limitToPartitionsWithLag cannot be set simultaneously") - } - if len(meta.topic) == 0 && meta.limitToPartitionsWithLag { - return meta, fmt.Errorf("topic must be specified when using limitToPartitionsWithLag") - } + limitToPartitionsWithLag, err := getParameterFromConfigV2(config, "limitToPartitionsWithLag", true, false, false, true, false, reflect.TypeOf(true)) + if err != nil { + return meta, err + } + meta.limitToPartitionsWithLag = limitToPartitionsWithLag.(bool) + if meta.allowIdleConsumers && meta.limitToPartitionsWithLag { + return meta, fmt.Errorf("allowIdleConsumers and limitToPartitionsWithLag cannot be set simultaneously") + } + if len(meta.topic) == 0 && meta.limitToPartitionsWithLag { + return meta, fmt.Errorf("topic must be specified when using limitToPartitionsWithLag") } meta.triggerIndex = config.TriggerIndex From 177d2b1e6269fe7cdb81caab925c5f891661f607 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Mon, 15 Jan 2024 16:41:58 +0000 Subject: [PATCH 07/11] Rebase from master Signed-off-by: dttung2905 --- pkg/scalers/apache_kafka_scaler.go | 1 + pkg/scalers/apache_kafka_scaler_test.go | 1 - pkg/scalers/kafka_scaler.go | 3 ++- pkg/scalers/kafka_scaler_test.go | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/scalers/apache_kafka_scaler.go b/pkg/scalers/apache_kafka_scaler.go index 3ae87a9f97e..2c7d8f70359 100644 --- a/pkg/scalers/apache_kafka_scaler.go +++ b/pkg/scalers/apache_kafka_scaler.go @@ -190,6 +190,7 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache if err != nil { return fmt.Errorf("%w. No awsRegion given", err) } + meta.awsRegion = awsRegion.(string) auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv) if err != nil { return err diff --git a/pkg/scalers/apache_kafka_scaler_test.go b/pkg/scalers/apache_kafka_scaler_test.go index 11c2864cf34..504932c88ce 100644 --- a/pkg/scalers/apache_kafka_scaler_test.go +++ b/pkg/scalers/apache_kafka_scaler_test.go @@ -290,7 +290,6 @@ func getBrokerApacheKafkaTestBase(t *testing.T, meta apacheKafkaMetadata, testDa if er != nil { t.Errorf("Unable to convert test data lagThreshold %s to string", testData.metadata["lagThreshold"]) } - if meta.lagThreshold != expectedLagThreshold && meta.lagThreshold != defaultKafkaLagThreshold { t.Errorf("Expected lagThreshold to be either %v or %v got %v ", meta.lagThreshold, defaultKafkaLagThreshold, expectedLagThreshold) } diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index f50e14a714d..b01c233fb82 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -426,7 +426,7 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) } meta.offsetResetPolicy = policy } - + meta.lagThreshold = defaultKafkaLagThreshold lagThreshold, err := getParameterFromConfigV2(config, lagThresholdMetricName, true, false, false, true, defaultKafkaLagThreshold, reflect.TypeOf(64)) if err != nil { return meta, err @@ -489,6 +489,7 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) if err != nil { return meta, fmt.Errorf("error parsing kafka version: %w", err) } + meta.version = version meta.triggerIndex = config.TriggerIndex return meta, nil } diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index fd59e80de27..b69a07fb2b5 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -321,7 +321,7 @@ var kafkaMetricIdentifiers = []kafkaMetricIdentifier{ } func TestGetBrokers(t *testing.T) { - for idx, testData := range parseKafkaMetadataTestDataset { + for _, testData := range parseKafkaMetadataTestDataset { meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validWithAuthParams}, logr.Discard()) getBrokerTestBase(t, meta, testData, err) From 24ad66c2eddd8d3ec1879dc17207ceb67a92f975 Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Sun, 11 Feb 2024 17:00:28 +0000 Subject: [PATCH 08/11] Rebase from master Signed-off-by: dttung2905 --- pkg/scalers/apache_kafka_scaler.go | 236 ++++++++++++++++++--- pkg/scalers/kafka_scaler.go | 327 ++++++++++++++++++++++++++--- pkg/scalers/kafka_scaler_test.go | 4 +- 3 files changed, 508 insertions(+), 59 deletions(-) diff --git a/pkg/scalers/apache_kafka_scaler.go b/pkg/scalers/apache_kafka_scaler.go index 2c7d8f70359..6edaf51d5e3 100644 --- a/pkg/scalers/apache_kafka_scaler.go +++ b/pkg/scalers/apache_kafka_scaler.go @@ -121,7 +121,16 @@ func NewApacheKafkaScaler(ctx context.Context, config *scalersconfig.ScalerConfi func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apacheKafkaMetadata) error { meta.enableTLS = false var enableTLS bool - tlsString, err := getParameterFromConfigV2(config, "tls", true, true, false, true, "disable", reflect.TypeOf("")) + tlsString, err := getParameterFromConfigV2( + config, + "tls", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal("disable"), + ) if err != nil { return fmt.Errorf("error incorrect TLS value given. %w", err) } @@ -136,11 +145,29 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache } if enableTLS { - certGiven, err := getParameterFromConfigV2(config, "cert", false, true, false, true, "", reflect.TypeOf("")) + certGiven, err := getParameterFromConfigV2( + config, + "cert", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return err } - keyGiven, err := getParameterFromConfigV2(config, "key", false, true, false, true, "", reflect.TypeOf("")) + keyGiven, err := getParameterFromConfigV2( + config, + "key", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return err } @@ -150,14 +177,32 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache if keyGiven == "" && certGiven != "" { return errors.New("cert must be provided with key") } - ca, err := getParameterFromConfigV2(config, "ca", false, true, false, true, "", reflect.TypeOf("")) + ca, err := getParameterFromConfigV2( + config, + "ca", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return err } meta.ca = ca.(string) meta.cert = certGiven.(string) meta.key = keyGiven.(string) - keyPassword, err := getParameterFromConfigV2(config, "keyPassword", false, true, false, true, "", reflect.TypeOf("")) + keyPassword, err := getParameterFromConfigV2( + config, + "keyPassword", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return err } @@ -166,7 +211,16 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache } meta.saslType = KafkaSASLTypeNone - saslAuthType, err := getParameterFromConfigV2(config, "sasl", true, true, false, true, "", reflect.TypeOf("")) + saslAuthType, err := getParameterFromConfigV2( + config, + "sasl", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return err } @@ -176,7 +230,16 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache switch mode := kafkaSaslType(saslAuthType.(string)); mode { case KafkaSASLTypeMskIam: meta.saslType = mode - awsEndpoint, err := getParameterFromConfigV2(config, "awsEndpoint", true, false, false, true, "", reflect.TypeOf("")) + awsEndpoint, err := getParameterFromConfigV2( + config, + "awsEndpoint", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return err } @@ -186,7 +249,15 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache if !meta.enableTLS { return errors.New("TLS is required for MSK") } - awsRegion, err := getParameterFromConfigV2(config, "awsRegion", true, false, false, false, "", reflect.TypeOf("")) + awsRegion, err := getParameterFromConfigV2( + config, + "awsRegion", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(false), + ) if err != nil { return fmt.Errorf("%w. No awsRegion given", err) } @@ -201,13 +272,30 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache case KafkaSASLTypeSCRAMSHA256: fallthrough case KafkaSASLTypeSCRAMSHA512: - username, err := getParameterFromConfigV2(config, "username", false, true, false, false, "", reflect.TypeOf("")) + username, err := getParameterFromConfigV2( + config, + "username", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(false), + WithDefaultVal(""), + ) if err != nil { return fmt.Errorf("%w. No username given", err) } meta.username = strings.TrimSpace(username.(string)) - - password, err := getParameterFromConfigV2(config, "password", false, true, false, false, "", reflect.TypeOf("")) + password, err := getParameterFromConfigV2( + config, + "password", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(false), + WithDefaultVal(""), + ) if err != nil { return fmt.Errorf("%w. No password given", err) } @@ -224,19 +312,45 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (apacheKafkaMetadata, error) { meta := apacheKafkaMetadata{} - bootstrapServers, err := getParameterFromConfigV2(config, "bootstrapServers", true, false, true, false, "", reflect.TypeOf("")) + bootstrapServers, err := getParameterFromConfigV2( + config, + "bootstrapServers", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(true), + IsOptional(false), + WithDefaultVal(""), + ) if err != nil { return meta, fmt.Errorf("no bootstrapServers given. %w", err) } meta.bootstrapServers = strings.Split(bootstrapServers.(string), ",") - consumerGroup, err := getParameterFromConfigV2(config, "consumerGroup", true, false, true, false, "", reflect.TypeOf("")) + consumerGroup, err := getParameterFromConfigV2( + config, + "consumerGroup", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(true), + IsOptional(false), + WithDefaultVal(""), + ) if err != nil { return meta, fmt.Errorf("no consumer group given. %w", err) } meta.group = consumerGroup.(string) - - topic, err := getParameterFromConfigV2(config, "topic", true, false, true, true, "", reflect.TypeOf("")) + topic, err := getParameterFromConfigV2( + config, + "topic", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(true), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return meta, err } @@ -249,7 +363,17 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo } meta.partitionLimitation = nil - partitionLimitationMetadata, err := getParameterFromConfigV2(config, "partitionLimitation", true, false, false, true, "", reflect.TypeOf("")) + partitionLimitationMetadata, err := getParameterFromConfigV2( + config, + "partitionLimitation", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) + if err != nil { return meta, err } @@ -268,7 +392,16 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo } meta.offsetResetPolicy = defaultOffsetResetPolicy - offsetResetPolicyRaw, err := getParameterFromConfigV2(config, "offsetResetPolicy", true, false, false, true, "", reflect.TypeOf("")) + offsetResetPolicyRaw, err := getParameterFromConfigV2( + config, + "offsetResetPolicy", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return meta, err } @@ -281,7 +414,17 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo } meta.lagThreshold = defaultKafkaLagThreshold - lagThreshold, err := getParameterFromConfigV2(config, lagThresholdMetricName, true, false, false, true, defaultKafkaLagThreshold, reflect.TypeOf(64)) + lagThreshold, err := getParameterFromConfigV2( + config, + lagThresholdMetricName, + reflect.TypeOf(64), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(defaultKafkaLagThreshold), + ) + if err != nil { return meta, err } @@ -291,7 +434,16 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo meta.lagThreshold = int64(lagThreshold.(int)) meta.activationLagThreshold = defaultKafkaActivationLagThreshold - activationLagThreshold, err := getParameterFromConfigV2(config, activationLagThresholdMetricName, true, false, false, true, int64(defaultKafkaActivationLagThreshold), reflect.TypeOf(int64(64))) + activationLagThreshold, err := getParameterFromConfigV2( + config, + activationLagThresholdMetricName, + reflect.TypeOf(int64(64)), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(int64(defaultKafkaActivationLagThreshold)), + ) if err != nil { return meta, err } @@ -303,25 +455,61 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo return meta, err } - allowIDConsumers, err := getParameterFromConfigV2(config, "allowIdleConsumers", true, false, false, true, false, reflect.TypeOf(true)) + allowIDConsumers, err := getParameterFromConfigV2( + config, + "allowIdleConsumers", + reflect.TypeOf(true), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(false), + ) if err != nil { return meta, fmt.Errorf("error parsing allowIdleConsumers: %w", err) } meta.allowIdleConsumers = allowIDConsumers.(bool) - excludePersistentLag, err := getParameterFromConfigV2(config, "excludePersistentLag", true, false, false, true, false, reflect.TypeOf(true)) + excludePersistentLag, err := getParameterFromConfigV2( + config, + "excludePersistentLag", + reflect.TypeOf(true), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(false), + ) if err != nil { return meta, fmt.Errorf("error parsing excludePersistentLag: %w", err) } meta.excludePersistentLag = excludePersistentLag.(bool) - scaleToZeroOnInvalidOffset, err := getParameterFromConfigV2(config, "scaleToZeroOnInvalidOffset", true, false, false, true, false, reflect.TypeOf(true)) + scaleToZeroOnInvalidOffset, err := getParameterFromConfigV2( + config, + "scaleToZeroOnInvalidOffset", + reflect.TypeOf(true), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(false), + ) if err != nil { return meta, fmt.Errorf("error parsing scaleToZeroOnInvalidOffset: %w", err) } meta.scaleToZeroOnInvalidOffset = scaleToZeroOnInvalidOffset.(bool) - limitToPartitionsWithLag, err := getParameterFromConfigV2(config, "limitToPartitionsWithLag", true, false, false, true, false, reflect.TypeOf(true)) + limitToPartitionsWithLag, err := getParameterFromConfigV2( + config, + "limitToPartitionsWithLag", + reflect.TypeOf(true), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(false), + ) if err != nil { return meta, err } @@ -426,7 +614,7 @@ func (s *apacheKafkaScaler) getTopicPartitions(ctx context.Context) (map[string] for _, topic := range metadata.Topics { partitions := make([]int, 0) for _, partition := range topic.Partitions { - // if no partitions limitatitions are specified, all partitions are considered + // if no partitions limitations are specified, all partitions are considered if (len(s.metadata.partitionLimitation) == 0) || (len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) { partitions = append(partitions, partition.ID) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index b01c233fb82..6d259a4c640 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -158,7 +158,16 @@ func NewKafkaScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { func parseKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata) error { meta.saslType = KafkaSASLTypeNone - saslAuthType, err := getParameterFromConfigV2(config, "sasl", true, true, false, true, "", reflect.TypeOf("")) + saslAuthType, err := getParameterFromConfigV2( + config, + "sasl", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(true), + UseResolvedEnv(true), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return err } @@ -185,7 +194,16 @@ func parseKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadat meta.enableTLS = false var enableTLS bool - tlsString, err := getParameterFromConfigV2(config, "tls", true, true, false, true, "disable", reflect.TypeOf("")) + tlsString, err := getParameterFromConfigV2( + config, + "tls", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal("disable"), + ) if err != nil { return fmt.Errorf("error incorrect TLS value given. %w", err) } @@ -206,14 +224,32 @@ func parseKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadat return nil } -func parseTLS(config *ScalerConfig, meta *kafkaMetadata) error { - certGiven, err := getParameterFromConfigV2(config, "cert", false, true, false, true, "", reflect.TypeOf("")) +func parseTLS(config *scalersconfig.ScalerConfig, meta *kafkaMetadata) error { + certGiven, err := getParameterFromConfigV2( + config, + "cert", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return err } meta.cert = certGiven.(string) - keyGiven, err := getParameterFromConfigV2(config, "key", false, true, false, true, "", reflect.TypeOf("")) + keyGiven, err := getParameterFromConfigV2( + config, + "key", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return err } @@ -226,19 +262,46 @@ func parseTLS(config *ScalerConfig, meta *kafkaMetadata) error { return errors.New("cert must be provided with key") } - ca, err := getParameterFromConfigV2(config, "ca", false, true, false, true, "", reflect.TypeOf("")) + ca, err := getParameterFromConfigV2( + config, + "ca", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return err } meta.ca = ca.(string) - unsafeSslRaw, err := getParameterFromConfigV2(config, "unsafeSsl", true, false, false, true, defaultUnsafeSsl, reflect.TypeOf(true)) + unsafeSslRaw, err := getParameterFromConfigV2( + config, + "unsafeSsl", + reflect.TypeOf(true), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(defaultUnsafeSsl), + ) if err != nil { return fmt.Errorf("error parsing unsafeSsl: %w", err) } meta.unsafeSsl = unsafeSslRaw.(bool) - keyPassword, err := getParameterFromConfigV2(config, "keyPassword", false, true, false, true, "", reflect.TypeOf("")) + keyPassword, err := getParameterFromConfigV2( + config, + "keyPassword", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return err } @@ -248,18 +311,45 @@ func parseTLS(config *ScalerConfig, meta *kafkaMetadata) error { return nil } -func parseKerberosParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error { - username, err := getParameterFromConfigV2(config, "username", false, true, false, false, "", reflect.TypeOf("")) +func parseKerberosParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error { + username, err := getParameterFromConfigV2( + config, + "username", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(false), + WithDefaultVal(""), + ) if err != nil { return fmt.Errorf("no username given. %w", err) } meta.username = strings.TrimSpace(username.(string)) - password, err := getParameterFromConfigV2(config, "password", false, true, false, true, "", reflect.TypeOf("")) + password, err := getParameterFromConfigV2( + config, + "password", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return err } - keytab, err := getParameterFromConfigV2(config, "keytab", false, true, false, true, "", reflect.TypeOf("")) + keytab, err := getParameterFromConfigV2( + config, + "keytab", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return err } @@ -278,13 +368,31 @@ func parseKerberosParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSa meta.keytabPath = path } - realm, err := getParameterFromConfigV2(config, "realm", false, true, false, false, "", reflect.TypeOf("")) + realm, err := getParameterFromConfigV2( + config, + "realm", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(false), + WithDefaultVal(""), + ) if err != nil { return fmt.Errorf("no realm given. %w", err) } meta.realm = strings.TrimSpace(realm.(string)) - kerberosConfig, err := getParameterFromConfigV2(config, "kerberosConfig", false, true, false, false, "", reflect.TypeOf("")) + kerberosConfig, err := getParameterFromConfigV2( + config, + "kerberosConfig", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(false), + WithDefaultVal(""), + ) if err != nil { return fmt.Errorf("no Kerberos configuration file (kerberosConfig) given. %w", err) } @@ -302,14 +410,32 @@ func parseKerberosParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSa return nil } -func parseSaslParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error { - username, err := getParameterFromConfigV2(config, "username", false, true, false, false, "", reflect.TypeOf("")) +func parseSaslParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error { + username, err := getParameterFromConfigV2( + config, + "username", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(false), + WithDefaultVal(""), + ) if err != nil { return fmt.Errorf("no username given. %w", err) } meta.username = strings.TrimSpace(username.(string)) - password, err := getParameterFromConfigV2(config, "password", false, true, false, false, "", reflect.TypeOf("")) + password, err := getParameterFromConfigV2( + config, + "password", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(false), + WithDefaultVal(""), + ) if err != nil { return fmt.Errorf("no password given. %w", err) } @@ -317,20 +443,47 @@ func parseSaslParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslTy meta.saslType = mode if mode == KafkaSASLTypeOAuthbearer { - scopes, err := getParameterFromConfigV2(config, "scopes", false, true, false, true, "", reflect.TypeOf("")) + scopes, err := getParameterFromConfigV2( + config, + "scopes", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return fmt.Errorf("no scopes given. %w", err) } meta.scopes = strings.Split(scopes.(string), ",") - oauthTokenEndpointsURI, err := getParameterFromConfigV2(config, "oauthTokenEndpointUri", false, true, false, false, "", reflect.TypeOf("")) + oauthTokenEndpointsURI, err := getParameterFromConfigV2( + config, + "oauthTokenEndpointUri", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(false), + WithDefaultVal(""), + ) if err != nil { return fmt.Errorf("no oauth token endpoint uri given. %w", err) } meta.oauthTokenEndpointURI = strings.TrimSpace(oauthTokenEndpointsURI.(string)) meta.oauthExtensions = make(map[string]string) - oauthExtensionsRaw, _ := getParameterFromConfigV2(config, "oauthExtensions", false, true, false, true, "", reflect.TypeOf("")) + oauthExtensionsRaw, _ := getParameterFromConfigV2( + config, + "oauthExtensions", + reflect.TypeOf(""), + UseMetadata(false), + UseAuthentication(true), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if oauthExtensionsRaw != "" { for _, extension := range strings.Split(oauthExtensionsRaw.(string), ",") { splitExtension := strings.Split(extension, "=") @@ -373,19 +526,46 @@ func saveToFile(content string) (string, error) { func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (kafkaMetadata, error) { meta := kafkaMetadata{} - bootstrapServers, err := getParameterFromConfigV2(config, "bootstrapServers", true, false, true, false, "", reflect.TypeOf("")) + bootstrapServers, err := getParameterFromConfigV2( + config, + "bootstrapServers", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(true), + IsOptional(false), + WithDefaultVal(""), + ) if err != nil { return meta, fmt.Errorf("no bootstrapServers given. %w", err) } meta.bootstrapServers = strings.Split(bootstrapServers.(string), ",") - consumerGroup, err := getParameterFromConfigV2(config, "consumerGroup", true, false, true, false, "", reflect.TypeOf("")) + consumerGroup, err := getParameterFromConfigV2( + config, + "consumerGroup", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(true), + IsOptional(false), + WithDefaultVal(""), + ) if err != nil { return meta, fmt.Errorf("no consumer group given. %w", err) } meta.group = consumerGroup.(string) - topic, err := getParameterFromConfigV2(config, "topic", true, false, true, true, "", reflect.TypeOf("")) + topic, err := getParameterFromConfigV2( + config, + "topic", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(true), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return meta, err } @@ -396,7 +576,16 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) meta.topic = topic.(string) meta.partitionLimitation = nil - partitionLimitationMetadata, err := getParameterFromConfigV2(config, "partitionLimitation", true, false, false, true, "", reflect.TypeOf("")) + partitionLimitationMetadata, err := getParameterFromConfigV2( + config, + "partitionLimitation", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return meta, err } @@ -415,7 +604,16 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) } meta.offsetResetPolicy = defaultOffsetResetPolicy - offsetResetPolicyRaw, err := getParameterFromConfigV2(config, "offsetResetPolicy", true, false, false, true, "", reflect.TypeOf("")) + offsetResetPolicyRaw, err := getParameterFromConfigV2( + config, + "offsetResetPolicy", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(""), + ) if err != nil { return meta, err } @@ -427,7 +625,16 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) meta.offsetResetPolicy = policy } meta.lagThreshold = defaultKafkaLagThreshold - lagThreshold, err := getParameterFromConfigV2(config, lagThresholdMetricName, true, false, false, true, defaultKafkaLagThreshold, reflect.TypeOf(64)) + lagThreshold, err := getParameterFromConfigV2( + config, + lagThresholdMetricName, + reflect.TypeOf(64), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(defaultKafkaLagThreshold), + ) if err != nil { return meta, err } @@ -437,7 +644,16 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) meta.lagThreshold = int64(lagThreshold.(int)) meta.activationLagThreshold = defaultKafkaActivationLagThreshold - activationLagThreshold, err := getParameterFromConfigV2(config, activationLagThresholdMetricName, true, false, false, true, int64(defaultKafkaActivationLagThreshold), reflect.TypeOf(int64(64))) + activationLagThreshold, err := getParameterFromConfigV2( + config, + activationLagThresholdMetricName, + reflect.TypeOf(int64(64)), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(int64(defaultKafkaActivationLagThreshold)), + ) if err != nil { return meta, err } @@ -449,26 +665,62 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) if err := parseKafkaAuthParams(config, &meta); err != nil { return meta, err } - - allowIDConsumers, err := getParameterFromConfigV2(config, "allowIdleConsumers", true, false, false, true, false, reflect.TypeOf(true)) + allowIDConsumers, err := getParameterFromConfigV2( + config, + "allowIdleConsumers", + reflect.TypeOf(true), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(false), + ) if err != nil { return meta, fmt.Errorf("error parsing allowIdleConsumers: %w", err) } meta.allowIdleConsumers = allowIDConsumers.(bool) - excludePersistentLag, err := getParameterFromConfigV2(config, "excludePersistentLag", true, false, false, true, false, reflect.TypeOf(true)) + excludePersistentLag, err := getParameterFromConfigV2( + config, + "excludePersistentLag", + reflect.TypeOf(true), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(false), + ) if err != nil { return meta, fmt.Errorf("error parsing excludePersistentLag: %w", err) } meta.excludePersistentLag = excludePersistentLag.(bool) - scaleToZeroOnInvalidOffset, err := getParameterFromConfigV2(config, "scaleToZeroOnInvalidOffset", true, false, false, true, false, reflect.TypeOf(true)) + scaleToZeroOnInvalidOffset, err := getParameterFromConfigV2( + config, + "scaleToZeroOnInvalidOffset", + reflect.TypeOf(true), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(false), + ) + if err != nil { return meta, fmt.Errorf("error parsing scaleToZeroOnInvalidOffset: %w", err) } meta.scaleToZeroOnInvalidOffset = scaleToZeroOnInvalidOffset.(bool) - limitToPartitionsWithLag, err := getParameterFromConfigV2(config, "limitToPartitionsWithLag", true, false, false, true, false, reflect.TypeOf(true)) + limitToPartitionsWithLag, err := getParameterFromConfigV2( + config, + "limitToPartitionsWithLag", + reflect.TypeOf(true), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal(false), + ) if err != nil { return meta, err } @@ -480,7 +732,16 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) if len(meta.topic) == 0 && meta.limitToPartitionsWithLag { return meta, fmt.Errorf("topic must be specified when using limitToPartitionsWithLag") } - saramaVer, err := getParameterFromConfigV2(config, "version", true, false, false, true, "1.0.0", reflect.TypeOf("")) + saramaVer, err := getParameterFromConfigV2( + config, + "version", + reflect.TypeOf(""), + UseMetadata(true), + UseAuthentication(false), + UseResolvedEnv(false), + IsOptional(true), + WithDefaultVal("1.0.0"), + ) if err != nil { return meta, err } diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index b69a07fb2b5..4742130d9ed 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -322,7 +322,7 @@ var kafkaMetricIdentifiers = []kafkaMetricIdentifier{ func TestGetBrokers(t *testing.T) { for _, testData := range parseKafkaMetadataTestDataset { - meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validWithAuthParams}, logr.Discard()) + meta, err := parseKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validWithAuthParams}, logr.Discard()) getBrokerTestBase(t, meta, testData, err) meta, err = parseKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validWithoutAuthParams}, logr.Discard()) @@ -376,7 +376,7 @@ func getBrokerTestBase(t *testing.T, meta kafkaMetadata, testData parseKafkaMeta func TestKafkaAuthParamsInTriggerAuthentication(t *testing.T) { for idx, testData := range parseKafkaAuthParamsTestDataset { - meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: validKafkaMetadata, AuthParams: testData.authParams}, logr.Discard()) + meta, err := parseKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: validKafkaMetadata, AuthParams: testData.authParams}, logr.Discard()) if err != nil && !testData.isError { t.Errorf("Test %v: expected success but got error %v", idx, err) From f250cc1aee9faec0f80a9e3d4b1196cf1c6a32e8 Mon Sep 17 00:00:00 2001 From: Dao Thanh Tung Date: Mon, 19 Feb 2024 20:14:12 +0000 Subject: [PATCH 09/11] refactor from review feedback Signed-off-by: Dao Thanh Tung --- pkg/scalers/apache_kafka_scaler.go | 127 +++++++-------------- pkg/scalers/kafka_scaler.go | 173 ++++++++++------------------- pkg/scalers/scaler.go | 18 +-- pkg/scalers/scaler_test.go | 23 +++- 4 files changed, 127 insertions(+), 214 deletions(-) diff --git a/pkg/scalers/apache_kafka_scaler.go b/pkg/scalers/apache_kafka_scaler.go index 6edaf51d5e3..803daf7172d 100644 --- a/pkg/scalers/apache_kafka_scaler.go +++ b/pkg/scalers/apache_kafka_scaler.go @@ -125,10 +125,9 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache config, "tls", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + UseAuthentication(), + IsOptional(), WithDefaultVal("disable"), ) if err != nil { @@ -149,10 +148,8 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache config, "cert", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(true), + UseAuthentication(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -162,10 +159,8 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache config, "key", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(true), + UseAuthentication(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -181,10 +176,8 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache config, "ca", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(true), + UseAuthentication(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -197,10 +190,8 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache config, "keyPassword", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(true), + UseAuthentication(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -215,10 +206,9 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache config, "sasl", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + UseAuthentication(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -234,10 +224,9 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache config, "awsEndpoint", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + UseAuthentication(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -253,10 +242,7 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache config, "awsRegion", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(false), + UseMetadata(), ) if err != nil { return fmt.Errorf("%w. No awsRegion given", err) @@ -276,10 +262,7 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache config, "username", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(false), + UseAuthentication(), WithDefaultVal(""), ) if err != nil { @@ -290,10 +273,7 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache config, "password", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(false), + UseAuthentication(), WithDefaultVal(""), ) if err != nil { @@ -316,10 +296,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo config, "bootstrapServers", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(true), - IsOptional(false), + UseMetadata(), + UseResolvedEnv(), WithDefaultVal(""), ) if err != nil { @@ -331,10 +309,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo config, "consumerGroup", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(true), - IsOptional(false), + UseMetadata(), + UseResolvedEnv(), WithDefaultVal(""), ) if err != nil { @@ -345,10 +321,9 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo config, "topic", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(true), - IsOptional(true), + UseMetadata(), + UseResolvedEnv(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -367,10 +342,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo config, "partitionLimitation", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(""), ) @@ -396,10 +369,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo config, "offsetResetPolicy", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -418,10 +389,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo config, lagThresholdMetricName, reflect.TypeOf(64), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(defaultKafkaLagThreshold), ) @@ -438,10 +407,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo config, activationLagThresholdMetricName, reflect.TypeOf(int64(64)), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(int64(defaultKafkaActivationLagThreshold)), ) if err != nil { @@ -459,10 +426,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo config, "allowIdleConsumers", reflect.TypeOf(true), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(false), ) if err != nil { @@ -474,10 +439,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo config, "excludePersistentLag", reflect.TypeOf(true), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(false), ) if err != nil { @@ -489,10 +452,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo config, "scaleToZeroOnInvalidOffset", reflect.TypeOf(true), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(false), ) if err != nil { @@ -504,10 +465,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo config, "limitToPartitionsWithLag", reflect.TypeOf(true), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(false), ) if err != nil { diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 6d259a4c640..b3775398474 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -162,10 +162,10 @@ func parseKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadat config, "sasl", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(true), - UseResolvedEnv(true), - IsOptional(true), + UseMetadata(), + UseAuthentication(), + UseResolvedEnv(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -198,10 +198,9 @@ func parseKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadat config, "tls", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + UseAuthentication(), + IsOptional(), WithDefaultVal("disable"), ) if err != nil { @@ -229,10 +228,8 @@ func parseTLS(config *scalersconfig.ScalerConfig, meta *kafkaMetadata) error { config, "cert", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(true), + UseAuthentication(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -244,10 +241,8 @@ func parseTLS(config *scalersconfig.ScalerConfig, meta *kafkaMetadata) error { config, "key", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(true), + UseAuthentication(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -266,10 +261,8 @@ func parseTLS(config *scalersconfig.ScalerConfig, meta *kafkaMetadata) error { config, "ca", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(true), + UseAuthentication(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -281,10 +274,8 @@ func parseTLS(config *scalersconfig.ScalerConfig, meta *kafkaMetadata) error { config, "unsafeSsl", reflect.TypeOf(true), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(defaultUnsafeSsl), ) if err != nil { @@ -296,10 +287,8 @@ func parseTLS(config *scalersconfig.ScalerConfig, meta *kafkaMetadata) error { config, "keyPassword", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(true), + UseAuthentication(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -316,10 +305,7 @@ func parseKerberosParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata config, "username", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(false), + UseAuthentication(), WithDefaultVal(""), ) if err != nil { @@ -331,10 +317,8 @@ func parseKerberosParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata config, "password", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(true), + UseAuthentication(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -344,10 +328,8 @@ func parseKerberosParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata config, "keytab", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(true), + UseAuthentication(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -372,10 +354,7 @@ func parseKerberosParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata config, "realm", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(false), + UseAuthentication(), WithDefaultVal(""), ) if err != nil { @@ -387,10 +366,7 @@ func parseKerberosParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata config, "kerberosConfig", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(false), + UseAuthentication(), WithDefaultVal(""), ) if err != nil { @@ -415,10 +391,7 @@ func parseSaslParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata, mo config, "username", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(false), + UseAuthentication(), WithDefaultVal(""), ) if err != nil { @@ -430,10 +403,8 @@ func parseSaslParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata, mo config, "password", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(false), + UseMetadata(), + UseAuthentication(), WithDefaultVal(""), ) if err != nil { @@ -447,10 +418,8 @@ func parseSaslParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata, mo config, "scopes", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(true), + UseAuthentication(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -462,10 +431,7 @@ func parseSaslParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata, mo config, "oauthTokenEndpointUri", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(false), + UseAuthentication(), WithDefaultVal(""), ) if err != nil { @@ -478,10 +444,8 @@ func parseSaslParams(config *scalersconfig.ScalerConfig, meta *kafkaMetadata, mo config, "oauthExtensions", reflect.TypeOf(""), - UseMetadata(false), - UseAuthentication(true), - UseResolvedEnv(false), - IsOptional(true), + UseAuthentication(), + IsOptional(), WithDefaultVal(""), ) if oauthExtensionsRaw != "" { @@ -530,10 +494,8 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) config, "bootstrapServers", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(true), - IsOptional(false), + UseMetadata(), + UseResolvedEnv(), WithDefaultVal(""), ) if err != nil { @@ -545,10 +507,8 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) config, "consumerGroup", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(true), - IsOptional(false), + UseMetadata(), + UseResolvedEnv(), WithDefaultVal(""), ) if err != nil { @@ -560,10 +520,9 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) config, "topic", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(true), - IsOptional(true), + UseMetadata(), + UseResolvedEnv(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -580,10 +539,8 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) config, "partitionLimitation", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -608,10 +565,8 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) config, "offsetResetPolicy", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(""), ) if err != nil { @@ -629,10 +584,8 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) config, lagThresholdMetricName, reflect.TypeOf(64), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(defaultKafkaLagThreshold), ) if err != nil { @@ -648,10 +601,8 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) config, activationLagThresholdMetricName, reflect.TypeOf(int64(64)), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(int64(defaultKafkaActivationLagThreshold)), ) if err != nil { @@ -669,10 +620,8 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) config, "allowIdleConsumers", reflect.TypeOf(true), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(false), ) if err != nil { @@ -684,10 +633,8 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) config, "excludePersistentLag", reflect.TypeOf(true), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(false), ) if err != nil { @@ -699,10 +646,8 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) config, "scaleToZeroOnInvalidOffset", reflect.TypeOf(true), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(false), ) @@ -715,10 +660,8 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) config, "limitToPartitionsWithLag", reflect.TypeOf(true), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal(false), ) if err != nil { @@ -736,10 +679,8 @@ func parseKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) config, "version", reflect.TypeOf(""), - UseMetadata(true), - UseAuthentication(false), - UseResolvedEnv(false), - IsOptional(true), + UseMetadata(), + IsOptional(), WithDefaultVal("1.0.0"), ) if err != nil { diff --git a/pkg/scalers/scaler.go b/pkg/scalers/scaler.go index df37eb210e0..980a284cc87 100644 --- a/pkg/scalers/scaler.go +++ b/pkg/scalers/scaler.go @@ -180,30 +180,30 @@ type configOptions struct { } // UseMetadata is an Option function that sets the useMetadata field of configOptions. -func UseMetadata(metadata bool) Option { +func UseMetadata() Option { return func(opt *configOptions) { - opt.useMetadata = metadata + opt.useMetadata = true } } // UseAuthentication is an Option function that sets the useAuthentication field of configOptions. -func UseAuthentication(auth bool) Option { +func UseAuthentication() Option { return func(opt *configOptions) { - opt.useAuthentication = auth + opt.useAuthentication = true } } // UseResolvedEnv is an Option function that sets the useResolvedEnv field of configOptions. -func UseResolvedEnv(resolvedEnv bool) Option { +func UseResolvedEnv() Option { return func(opt *configOptions) { - opt.useResolvedEnv = resolvedEnv + opt.useResolvedEnv = true } } // IsOptional is an Option function that sets the isOptional field of configOptions. -func IsOptional(optional bool) Option { +func IsOptional() Option { return func(opt *configOptions) { - opt.isOptional = optional + opt.isOptional = true } } @@ -235,7 +235,7 @@ func WithDefaultVal(defaultVal interface{}) Option { // To retrieve a parameter value from a ScalerConfig object, you can call this function with the necessary parameters and options // // ``` -// val, err := getParameterFromConfigV2(scalerConfig, "parameterName", reflect.TypeOf(int64(0)), UseMetadata(true), UseAuthentication(true)) +// val, err := getParameterFromConfigV2(scalerConfig, "parameterName", reflect.TypeOf(int64(0)), UseMetadata(), UseAuthentication()) // if err != nil { // // Handle error // } diff --git a/pkg/scalers/scaler_test.go b/pkg/scalers/scaler_test.go index 59508b9ba58..781ba058b20 100644 --- a/pkg/scalers/scaler_test.go +++ b/pkg/scalers/scaler_test.go @@ -254,15 +254,28 @@ var getParameterFromConfigTestDataset = []getParameterFromConfigTestData{ func TestGetParameterFromConfigV2(t *testing.T) { for _, testData := range getParameterFromConfigTestDataset { + options := []Option{ + WithDefaultVal(testData.defaultVal), + } + + if testData.useMetadata { + options = append(options, UseMetadata()) + } + if testData.useAuthentication { + options = append(options, UseAuthentication()) + } + if testData.useResolvedEnv { + options = append(options, UseResolvedEnv()) + } + if testData.isOptional { + options = append(options, IsOptional()) + } + val, err := getParameterFromConfigV2( &scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams, ResolvedEnv: testData.resolvedEnv}, testData.parameter, testData.targetType, - UseMetadata(testData.useMetadata), - UseAuthentication(testData.useAuthentication), - UseResolvedEnv(testData.useResolvedEnv), - IsOptional(testData.isOptional), - WithDefaultVal(testData.defaultVal), + options..., ) if testData.isError { assert.NotNilf(t, err, "test %s: expected error but got success, testData - %+v", testData.name, testData) From 40cde1218a727d36f9ee99a2f25adb5ef814060b Mon Sep 17 00:00:00 2001 From: Dao Thanh Tung Date: Mon, 19 Feb 2024 20:28:50 +0000 Subject: [PATCH 10/11] gofmt fix Signed-off-by: Dao Thanh Tung --- pkg/scalers/scaler.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/scalers/scaler.go b/pkg/scalers/scaler.go index 980a284cc87..de3bf69c693 100644 --- a/pkg/scalers/scaler.go +++ b/pkg/scalers/scaler.go @@ -182,28 +182,28 @@ type configOptions struct { // UseMetadata is an Option function that sets the useMetadata field of configOptions. func UseMetadata() Option { return func(opt *configOptions) { - opt.useMetadata = true + opt.useMetadata = true } } // UseAuthentication is an Option function that sets the useAuthentication field of configOptions. func UseAuthentication() Option { return func(opt *configOptions) { - opt.useAuthentication = true + opt.useAuthentication = true } } // UseResolvedEnv is an Option function that sets the useResolvedEnv field of configOptions. func UseResolvedEnv() Option { return func(opt *configOptions) { - opt.useResolvedEnv = true + opt.useResolvedEnv = true } } // IsOptional is an Option function that sets the isOptional field of configOptions. func IsOptional() Option { return func(opt *configOptions) { - opt.isOptional = true + opt.isOptional = true } } From 218c15f903416c85a58ece695aec5fe9ea2eddfd Mon Sep 17 00:00:00 2001 From: dttung2905 Date: Wed, 10 Apr 2024 10:05:37 +0100 Subject: [PATCH 11/11] Fix minor typo Signed-off-by: dttung2905 --- pkg/scalers/apache_kafka_scaler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/scalers/apache_kafka_scaler.go b/pkg/scalers/apache_kafka_scaler.go index 803daf7172d..1b0f088dcc0 100644 --- a/pkg/scalers/apache_kafka_scaler.go +++ b/pkg/scalers/apache_kafka_scaler.go @@ -417,6 +417,7 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo if activationLagThreshold.(int64) < 0 { return meta, fmt.Errorf("%q must be positive number", activationLagThresholdMetricName) } + meta.activationLagThreshold = activationLagThreshold.(int64) if err := parseApacheKafkaAuthParams(config, &meta); err != nil { return meta, err