Skip to content

Commit

Permalink
fix: always use sasl oauthbearer for sarama client (#6020)
Browse files Browse the repository at this point in the history
* fix: always uses sasl oauthbearer for sarama client

* fix tests
  • Loading branch information
JunliWang authored Feb 27, 2025
1 parent 61f3ee0 commit 2b5090b
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 76 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ require (
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/go-openapi/strfmt v0.23.0
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/hashicorp/go-uuid v1.0.3
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ cloud.google.com/go v0.51.0/go.mod h1:hWtGJ6gnXH+KgDv+V0zFGDvpi07n3z8ZNj3T1RW0Gc
cloud.google.com/go v0.52.0/go.mod h1:pXajvRH/6o3+F9jDHZWQ5PbGhn+o8w9qiu/CffaVdO4=
cloud.google.com/go v0.53.0/go.mod h1:fp/UouUEsRkN6ryDKNW/Upv/JBKnv6WDthjR6+vze6M=
cloud.google.com/go v0.54.0/go.mod h1:1rq2OEkV3YMf6n/9ZvGWI3GWw0VoqH/1x2nd8Is/bPc=
cloud.google.com/go v0.54.0/go.mod h1:1rq2OEkV3YMf6n/9ZvGWI3GWw0VoqH/1x2nd8Is/bPc=
cloud.google.com/go v0.56.0/go.mod h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKVk=
cloud.google.com/go v0.57.0/go.mod h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZs=
cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc=
Expand Down Expand Up @@ -552,8 +551,6 @@ github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzq
github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang-jwt/jwt/v4 v4.3.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk=
github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20180513044358-24b0969c4cb7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down
98 changes: 26 additions & 72 deletions ibm/service/eventstreams/resource_ibm_event_streams_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ package eventstreams

import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"log"
"slices"
Expand All @@ -21,7 +18,6 @@ import (
"github.com/IBM/go-sdk-core/v5/core"
iamidentity "github.com/IBM/platform-services-go-sdk/iamidentityv1"
"github.com/IBM/sarama"
jwt "github.com/golang-jwt/jwt/v5"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)
Expand Down Expand Up @@ -282,12 +278,6 @@ func createSaramaAdminClient(d *schema.ResourceData, meta interface{}) (sarama.C
}
instanceCRN = getInstanceCRN(topicID)
}
var adminClient sarama.ClusterAdmin
var ok bool
if adminClient, ok = clientPool[instanceCRN]; ok {
log.Printf("[DEBUG] createSaramaAdminClient got client from pool for instance %s", instanceCRN)
return adminClient, instanceCRN, nil
}
instance, err := getInstanceDetails(instanceCRN, meta)
if err != nil {
return nil, "", err
Expand All @@ -299,6 +289,12 @@ func createSaramaAdminClient(d *schema.ResourceData, meta interface{}) (sarama.C
slices.Sort(brokerAddress)
d.Set("kafka_brokers_sasl", brokerAddress)
log.Printf("[INFO] createSaramaAdminClient kafka_brokers_sasl is set to %s", brokerAddress)
var adminClient sarama.ClusterAdmin
var ok bool
if adminClient, ok = clientPool[instanceCRN]; ok {
log.Printf("[DEBUG] createSaramaAdminClient got client from pool for instance %s", instanceCRN)
return adminClient, instanceCRN, nil
}
config := sarama.NewConfig()
config.ClientID = fmt.Sprintf("terraform-provider-ibm/%s", version.Version)
config.Net.SASL.Enable = true
Expand All @@ -311,17 +307,10 @@ func createSaramaAdminClient(d *schema.ResourceData, meta interface{}) (sarama.C
config.Net.SASL.AuthIdentity = instanceCRN
}
config.Admin.Timeout = adminClientTimeout
if bxSession.Config.BluemixAPIKey != "" {
config.Net.SASL.User = "token"
config.Net.SASL.Password = bxSession.Config.BluemixAPIKey
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
log.Printf("[DEBUG] createSaramaAdminClient configured SASL mechanism=PLAIN")
} else if _, err = validateToken(bxSession.Config.IAMAccessToken); err == nil {
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
config.Net.SASL.TokenProvider = accessTokenProvider{clientSession: bxSession}
log.Printf("[DEBUG] createSaramaAdminClient configured SASL mechanism=OAUTHBEARER")
} else {
return nil, "", errors.New("either IBMCLOUD_API_KEY or IAM_TOKEN needs to be configured")
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
config.Net.SASL.TokenProvider, err = newAccessTokenProvider(bxSession)
if err != nil {
return nil, "", err
}
adminClient, err = sarama.NewClusterAdmin(brokerAddress, config)
if err != nil {
Expand Down Expand Up @@ -373,64 +362,29 @@ func getInstanceCRN(topicID string) string {
}

type accessTokenProvider struct {
clientSession *session.Session
authenticator *core.IamAuthenticator
}

// Token() implements sarama.AccessTokenProvider interface for sasl.mechanism=OAUTHBEARER
func (tp accessTokenProvider) Token() (*sarama.AccessToken, error) {
token, err := validateToken(tp.clientSession.Config.IAMAccessToken)
func newAccessTokenProvider(sess *session.Session) (*accessTokenProvider, error) {
authenticator, err := core.NewIamAuthenticatorBuilder().
SetURL(conns.EnvFallBack([]string{"IBMCLOUD_IAM_API_ENDPOINT"}, iamidentity.DefaultServiceURL)).
SetApiKey(sess.Config.BluemixAPIKey).
SetRefreshToken(sess.Config.IAMRefreshToken).
SetClientIDSecret("bx", "bx").
Build()
if err != nil {
log.Printf("[DEBUG] accessTokenProvider.Token() error:%s", err)
log.Printf("[DEBUG] newAccessTokenProvider() error:%s", err)
return nil, err
}
if expired(token) {
authenticator := &core.IamAuthenticator{
RefreshToken: tp.clientSession.Config.IAMRefreshToken,
ClientId: "bx",
ClientSecret: "bx",
URL: conns.EnvFallBack([]string{"IBMCLOUD_IAM_API_ENDPOINT"}, iamidentity.DefaultServiceURL),
}
token, err = authenticator.GetToken()
if err != nil {
log.Printf("[DEBUG] accessTokenProvider.authenticator.GetToken() error:%s", err)
return nil, err
}
}
return &sarama.AccessToken{Token: token}, nil
}

func validateToken(token string) (string, error) {
if len(token) == 0 {
return "", errors.New("IAMAccessToken is required")
}
token = strings.TrimPrefix(token, "Bearer")
token = strings.Trim(token, " ")
if len(strings.Split(token, ".")) != 3 {
return "", errors.New("IAMAccessToken is malformed")
}
return token, nil
return &accessTokenProvider{authenticator}, nil
}

func expired(token string) (expired bool) {
expired = true
tokenString, err := base64.RawURLEncoding.DecodeString(strings.Split(token, ".")[1])
if err != nil {
log.Printf("[DEBUG] expired.DecodeString() error:%s", err)
return
}
claims := jwt.RegisteredClaims{}
err = json.Unmarshal(tokenString, &claims)
// Token() implements sarama.AccessTokenProvider interface for sasl.mechanism=OAUTHBEARER
func (tp *accessTokenProvider) Token() (*sarama.AccessToken, error) {
token, err := tp.authenticator.GetToken()
if err != nil {
log.Printf("[DEBUG] expired.Unmarshal() error:%s", err)
return
}
if claims.ID == "" {
log.Printf("[DEBUG] expired.jit is empty")
return
}
if claims.ExpiresAt.IsZero() {
log.Printf("[DEBUG] expired.exp is zero")
return
log.Printf("[DEBUG] accessTokenProvider.GetToken() error:%s", err)
return nil, err
}
return claims.ExpiresAt.Before(time.Now().Add(10 * time.Second))
return &sarama.AccessToken{Token: token}, nil
}

0 comments on commit 2b5090b

Please sign in to comment.