Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AUTHN-1982] Implement OAuthBearer mechanism for Kafka client to fetch JWT token by communicating with SPIRE agent #1015

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
Updated closer error and changed oauthConf to principal
chang-you committed Jul 7, 2023
commit 1fdc6b0aaa6233b58530d75e3b87a865bbe55dfa
23 changes: 11 additions & 12 deletions examples/spire_producer.example/spire_producer.example.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding this file: Would it make sense to keep just one file (either the consumer or the producer, but not both)?
Since this is an example to demonstrate the usage of a custom oauth handler, what we're actually doing with the client shouldn't be the main focus of the example, what do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments! That makes sense too.
@arvindth what do you think? Thank you!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, a single producer or consumer example is probably good enough.

Copy link
Author

@chang-you chang-you Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@milindl @arvindth but I noticed that under the examples folder, most of the previous examples: protobuf/oauthbearer/json/avro include both producer example and consumer example, should we keep both?

Original file line number Diff line number Diff line change
@@ -42,7 +42,8 @@ type tokenAuth struct {
// previously-received token is 80% of the way to its expiration time).
func handleJWTTokenRefreshEvent(ctx context.Context, client kafka.Handle, principal, socketPath string, audience []string) {
fmt.Fprintf(os.Stderr, "Token refresh\n")
oauthBearerToken, retrieveErr := retrieveJWTToken(ctx, principal, socketPath, audience)
oauthBearerToken, closer, retrieveErr := retrieveJWTToken(ctx, principal, socketPath, audience)
defer closer()
if retrieveErr != nil {
fmt.Fprintf(os.Stderr, "%% Token retrieval error: %v\n", retrieveErr)
client.SetOAuthBearerTokenFailure(retrieveErr.Error())
@@ -55,13 +56,13 @@ func handleJWTTokenRefreshEvent(ctx context.Context, client kafka.Handle, princi
}
}

func retrieveJWTToken(ctx context.Context, principal, socketPath string, audience []string) (kafka.OAuthBearerToken, error) {
func retrieveJWTToken(ctx context.Context, principal, socketPath string, audience []string) (kafka.OAuthBearerToken, func() error, error) {
jwtSource, err := workloadapi.NewJWTSource(
ctx,
workloadapi.WithClientOptions(workloadapi.WithAddr(socketPath)),
)
if err != nil {
return kafka.OAuthBearerToken{}, fmt.Errorf("unable to create JWTSource: %w", err)
return kafka.OAuthBearerToken{}, nil, fmt.Errorf("unable to create JWTSource: %w", err)
}

defer jwtSource.Close()
@@ -74,7 +75,7 @@ func retrieveJWTToken(ctx context.Context, principal, socketPath string, audienc

jwtSVID, err := jwtSource.FetchJWTSVID(ctx, params)
if err != nil {
return kafka.OAuthBearerToken{}, fmt.Errorf("unable to fetch JWT SVID: %w", err)
return kafka.OAuthBearerToken{}, nil, fmt.Errorf("unable to fetch JWT SVID: %w", err)
}

oauthBearerToken := kafka.OAuthBearerToken{
@@ -84,7 +85,7 @@ func retrieveJWTToken(ctx context.Context, principal, socketPath string, audienc
Extensions: map[string]string{},
}

return oauthBearerToken, nil
return oauthBearerToken, jwtSource.Close, nil
}

func main() {
@@ -98,17 +99,15 @@ func main() {
topic := os.Args[2]
principal := os.Args[3]
socketPath := os.Args[4]
audience := []string{"audience1", "audience2"} // Audience should be defined properly
audience := []string{"audience1", "audience2"}

// You'll probably need to modify this configuration to
// match your environment.
config := kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanisms": "OAUTHBEARER",
"sasl.oauthbearer.config": map[string]string{
"principal": principal,
},
"bootstrap.servers": bootstrapServers,
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanisms": "OAUTHBEARER",
"sasl.oauthbearer.config": principal,
}

p, err := kafka.NewProducer(&config)