Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AUTHN-1982] Implement OAuthBearer mechanism for Kafka client to fetch JWT token by communicating with SPIRE agent #1015

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
Updated spire_consumer_example.go and comment description.
  • Loading branch information
chang-you committed Jul 11, 2023
commit 728a1557ebd958127befdcf69adc8963875faad0
75 changes: 48 additions & 27 deletions examples/spire_consumer_example/spire_consumer_example.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/**
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Example consumer with a custom SPIRE token implementation.
package main

import (
@@ -58,7 +75,7 @@ func retrieveJWTToken(ctx context.Context, principal, socketPath string, audienc
}

extensions := map[string]string{
"logicalCluster": "lkc-r6gdo0",
"logicalCluster": "lkc-0yoqvq",
"identityPoolId": "pool-W9j5",
}
oauthBearerToken := kafka.OAuthBearerToken{
@@ -95,62 +112,66 @@ func main() {
}

c, err := kafka.NewConsumer(&config)

if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}

fmt.Printf("Created Consumer %v\n", c)

err = c.SubscribeTopics([]string{topic}, nil)

if err != nil {
fmt.Fprintf(os.Stderr, "Failed to subscribe to topic: %s\n", topic)
os.Exit(1)
}

run := true
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)

ctx := context.Background()
go func() {
for {

for run {
select {
case sig := <-signalChannel:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}

switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value))
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %v\n", e.Headers)
}
_, err := c.StoreOffsets([]kafka.TopicPartition{e.TopicPartition})
_, err := c.StoreMessage(e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the StoreMessage and the subsequent error check, as it's not needed after removing "enable.auto.offset.store"

if err != nil {
fmt.Fprintf(os.Stderr, "%% Error storing offset: %v\n", err)
fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n",
e.TopicPartition)
}
case kafka.Error:
// Errors should generally be considered
// informational, the client will try to
// automatically recover.
// But in this example we choose to terminate
// the application if all brokers are down.
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if block isn't needed, you can remove the comment too, and replace with just

// Errors should generally be considered
// informational, the client will try to
// automatically recover.
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)

fmt.Fprintf(os.Stderr, "%% All brokers are down: terminating\n")
return
run = false
}
case kafka.OAuthBearerTokenRefresh:
handleJWTTokenRefreshEvent(ctx, c, principal, socketPath, audience)
default:
fmt.Printf("Ignored %v\n", e)
}
}
}()

err = c.Subscribe(topic, nil)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to subscribe to topic: %s\n", topic)
os.Exit(1)
}

run := true
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)

for run {
select {
case sig := <-signalChannel:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
}
}

fmt.Printf("Closing consumer\n")
18 changes: 17 additions & 1 deletion examples/spire_producer.example/spire_producer.example.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding this file: Would it make sense to keep just one file (either the consumer or the producer, but not both)?
Since this is an example to demonstrate the usage of a custom oauth handler, what we're actually doing with the client shouldn't be the main focus of the example, what do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments! That makes sense too.
@arvindth what do you think? Thank you!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, a single producer or consumer example is probably good enough.

Copy link
Author

@chang-you chang-you Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@milindl @arvindth but I noticed that under the examples folder, most of the previous examples: protobuf/oauthbearer/json/avro include both producer example and consumer example, should we keep both?

Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/**
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Example producer with a custom SPIRE token implementation.
package main

import (
@@ -60,7 +76,7 @@ func retrieveJWTToken(ctx context.Context, principal, socketPath string, audienc
}

extensions := map[string]string{
"logicalCluster": "lkc-r6gdo0",
"logicalCluster": "lkc-0yoqvq",
"identityPoolId": "pool-W9j5",
}
oauthBearerToken := kafka.OAuthBearerToken{