From 2b5090b4c57eaf5f34b3b1016d4eadcaccd0c0ff Mon Sep 17 00:00:00 2001 From: Junli Wang Date: Thu, 27 Feb 2025 05:43:26 -0800 Subject: [PATCH] fix: always use sasl oauthbearer for sarama client (#6020) * fix: always uses sasl oauthbearer for sarama client * fix tests --- go.mod | 1 - go.sum | 3 - .../resource_ibm_event_streams_topic.go | 98 +++++-------------- 3 files changed, 26 insertions(+), 76 deletions(-) diff --git a/go.mod b/go.mod index ef6fa5fb16..5fc9a7ffe5 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,6 @@ require ( github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 github.com/go-openapi/strfmt v0.23.0 github.com/golang-jwt/jwt v3.2.2+incompatible - github.com/golang-jwt/jwt/v5 v5.2.1 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/hashicorp/go-uuid v1.0.3 diff --git a/go.sum b/go.sum index 1a4aacdf5b..dc15794366 100644 --- a/go.sum +++ b/go.sum @@ -10,7 +10,6 @@ cloud.google.com/go v0.51.0/go.mod h1:hWtGJ6gnXH+KgDv+V0zFGDvpi07n3z8ZNj3T1RW0Gc cloud.google.com/go v0.52.0/go.mod h1:pXajvRH/6o3+F9jDHZWQ5PbGhn+o8w9qiu/CffaVdO4= cloud.google.com/go v0.53.0/go.mod h1:fp/UouUEsRkN6ryDKNW/Upv/JBKnv6WDthjR6+vze6M= cloud.google.com/go v0.54.0/go.mod h1:1rq2OEkV3YMf6n/9ZvGWI3GWw0VoqH/1x2nd8Is/bPc= -cloud.google.com/go v0.54.0/go.mod h1:1rq2OEkV3YMf6n/9ZvGWI3GWw0VoqH/1x2nd8Is/bPc= cloud.google.com/go v0.56.0/go.mod h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKVk= cloud.google.com/go v0.57.0/go.mod h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZs= cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc= @@ -552,8 +551,6 @@ github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzq github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang-jwt/jwt/v4 v4.3.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= -github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= -github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20180513044358-24b0969c4cb7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= diff --git a/ibm/service/eventstreams/resource_ibm_event_streams_topic.go b/ibm/service/eventstreams/resource_ibm_event_streams_topic.go index 4d3012be87..39a6faf35b 100644 --- a/ibm/service/eventstreams/resource_ibm_event_streams_topic.go +++ b/ibm/service/eventstreams/resource_ibm_event_streams_topic.go @@ -5,9 +5,6 @@ package eventstreams import ( "context" - "encoding/base64" - "encoding/json" - "errors" "fmt" "log" "slices" @@ -21,7 +18,6 @@ import ( "github.com/IBM/go-sdk-core/v5/core" iamidentity "github.com/IBM/platform-services-go-sdk/iamidentityv1" "github.com/IBM/sarama" - jwt "github.com/golang-jwt/jwt/v5" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" ) @@ -282,12 +278,6 @@ func createSaramaAdminClient(d *schema.ResourceData, meta interface{}) (sarama.C } instanceCRN = getInstanceCRN(topicID) } - var adminClient sarama.ClusterAdmin - var ok bool - if adminClient, ok = clientPool[instanceCRN]; ok { - log.Printf("[DEBUG] createSaramaAdminClient got client from pool for instance %s", instanceCRN) - return adminClient, instanceCRN, nil - } instance, err := getInstanceDetails(instanceCRN, meta) if err != nil { return nil, "", err @@ -299,6 +289,12 @@ func createSaramaAdminClient(d *schema.ResourceData, meta interface{}) (sarama.C slices.Sort(brokerAddress) d.Set("kafka_brokers_sasl", brokerAddress) log.Printf("[INFO] createSaramaAdminClient kafka_brokers_sasl is set to %s", brokerAddress) + var adminClient sarama.ClusterAdmin + var ok bool + if adminClient, ok = clientPool[instanceCRN]; ok { + log.Printf("[DEBUG] createSaramaAdminClient got client from pool for instance %s", instanceCRN) + return adminClient, instanceCRN, nil + } config := sarama.NewConfig() config.ClientID = fmt.Sprintf("terraform-provider-ibm/%s", version.Version) config.Net.SASL.Enable = true @@ -311,17 +307,10 @@ func createSaramaAdminClient(d *schema.ResourceData, meta interface{}) (sarama.C config.Net.SASL.AuthIdentity = instanceCRN } config.Admin.Timeout = adminClientTimeout - if bxSession.Config.BluemixAPIKey != "" { - config.Net.SASL.User = "token" - config.Net.SASL.Password = bxSession.Config.BluemixAPIKey - config.Net.SASL.Mechanism = sarama.SASLTypePlaintext - log.Printf("[DEBUG] createSaramaAdminClient configured SASL mechanism=PLAIN") - } else if _, err = validateToken(bxSession.Config.IAMAccessToken); err == nil { - config.Net.SASL.Mechanism = sarama.SASLTypeOAuth - config.Net.SASL.TokenProvider = accessTokenProvider{clientSession: bxSession} - log.Printf("[DEBUG] createSaramaAdminClient configured SASL mechanism=OAUTHBEARER") - } else { - return nil, "", errors.New("either IBMCLOUD_API_KEY or IAM_TOKEN needs to be configured") + config.Net.SASL.Mechanism = sarama.SASLTypeOAuth + config.Net.SASL.TokenProvider, err = newAccessTokenProvider(bxSession) + if err != nil { + return nil, "", err } adminClient, err = sarama.NewClusterAdmin(brokerAddress, config) if err != nil { @@ -373,64 +362,29 @@ func getInstanceCRN(topicID string) string { } type accessTokenProvider struct { - clientSession *session.Session + authenticator *core.IamAuthenticator } -// Token() implements sarama.AccessTokenProvider interface for sasl.mechanism=OAUTHBEARER -func (tp accessTokenProvider) Token() (*sarama.AccessToken, error) { - token, err := validateToken(tp.clientSession.Config.IAMAccessToken) +func newAccessTokenProvider(sess *session.Session) (*accessTokenProvider, error) { + authenticator, err := core.NewIamAuthenticatorBuilder(). + SetURL(conns.EnvFallBack([]string{"IBMCLOUD_IAM_API_ENDPOINT"}, iamidentity.DefaultServiceURL)). + SetApiKey(sess.Config.BluemixAPIKey). + SetRefreshToken(sess.Config.IAMRefreshToken). + SetClientIDSecret("bx", "bx"). + Build() if err != nil { - log.Printf("[DEBUG] accessTokenProvider.Token() error:%s", err) + log.Printf("[DEBUG] newAccessTokenProvider() error:%s", err) return nil, err } - if expired(token) { - authenticator := &core.IamAuthenticator{ - RefreshToken: tp.clientSession.Config.IAMRefreshToken, - ClientId: "bx", - ClientSecret: "bx", - URL: conns.EnvFallBack([]string{"IBMCLOUD_IAM_API_ENDPOINT"}, iamidentity.DefaultServiceURL), - } - token, err = authenticator.GetToken() - if err != nil { - log.Printf("[DEBUG] accessTokenProvider.authenticator.GetToken() error:%s", err) - return nil, err - } - } - return &sarama.AccessToken{Token: token}, nil -} - -func validateToken(token string) (string, error) { - if len(token) == 0 { - return "", errors.New("IAMAccessToken is required") - } - token = strings.TrimPrefix(token, "Bearer") - token = strings.Trim(token, " ") - if len(strings.Split(token, ".")) != 3 { - return "", errors.New("IAMAccessToken is malformed") - } - return token, nil + return &accessTokenProvider{authenticator}, nil } -func expired(token string) (expired bool) { - expired = true - tokenString, err := base64.RawURLEncoding.DecodeString(strings.Split(token, ".")[1]) - if err != nil { - log.Printf("[DEBUG] expired.DecodeString() error:%s", err) - return - } - claims := jwt.RegisteredClaims{} - err = json.Unmarshal(tokenString, &claims) +// Token() implements sarama.AccessTokenProvider interface for sasl.mechanism=OAUTHBEARER +func (tp *accessTokenProvider) Token() (*sarama.AccessToken, error) { + token, err := tp.authenticator.GetToken() if err != nil { - log.Printf("[DEBUG] expired.Unmarshal() error:%s", err) - return - } - if claims.ID == "" { - log.Printf("[DEBUG] expired.jit is empty") - return - } - if claims.ExpiresAt.IsZero() { - log.Printf("[DEBUG] expired.exp is zero") - return + log.Printf("[DEBUG] accessTokenProvider.GetToken() error:%s", err) + return nil, err } - return claims.ExpiresAt.Before(time.Now().Add(10 * time.Second)) + return &sarama.AccessToken{Token: token}, nil }