From 7a5c34e19ff3ddbdb94a8deddd12b253e4ea28d7 Mon Sep 17 00:00:00 2001 From: Mark Hindess Date: Wed, 30 Aug 2023 10:17:29 +0100 Subject: [PATCH 1/2] fix: add retry logic to AlterUserScramCredentials According to KIP-554, the AlterUserScramCredentialsRequest description states: It will be will be sent to the controller and will return NOT_CONTROLLER if the receiving broker is not the controller. so this request should handle retries. Signed-off-by: Mark Hindess --- admin.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/admin.go b/admin.go index 21fa10cc9..9a4a80194 100644 --- a/admin.go +++ b/admin.go @@ -1203,12 +1203,16 @@ func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsU Upsertions: u, } - b, err := ca.Controller() - if err != nil { - return nil, err - } + var rsp *AlterUserScramCredentialsResponse + err := ca.retryOnError(isErrNoController, func() error { + b, err := ca.Controller() + if err != nil { + return err + } - rsp, err := b.AlterUserScramCredentials(req) + rsp, err = b.AlterUserScramCredentials(req) + return err + }) if err != nil { return nil, err } From e25e91dce30748b489b6dffd5a6526059aa9f993 Mon Sep 17 00:00:00 2001 From: Mark Hindess Date: Wed, 30 Aug 2023 10:20:29 +0100 Subject: [PATCH 2/2] fix: typo Signed-off-by: Mark Hindess --- admin.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/admin.go b/admin.go index 9a4a80194..7dabd3737 100644 --- a/admin.go +++ b/admin.go @@ -196,9 +196,9 @@ func (ca *clusterAdmin) refreshController() (*Broker, error) { return ca.client.RefreshController() } -// isErrNoController returns `true` if the given error type unwraps to an +// isErrNotController returns `true` if the given error type unwraps to an // `ErrNotController` response from Kafka -func isErrNoController(err error) bool { +func isErrNotController(err error) bool { return errors.Is(err, ErrNotController) } @@ -249,7 +249,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO request.Version = 1 } - return ca.retryOnError(isErrNoController, func() error { + return ca.retryOnError(isErrNotController, func() error { b, err := ca.Controller() if err != nil { return err @@ -278,14 +278,14 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) { var response *MetadataResponse - err = ca.retryOnError(isErrNoController, func() error { + err = ca.retryOnError(isErrNotController, func() error { controller, err := ca.Controller() if err != nil { return err } request := NewMetadataRequest(ca.conf.Version, topics) response, err = controller.GetMetadata(request) - if isErrNoController(err) { + if isErrNotController(err) { _, _ = ca.refreshController() } return err @@ -298,7 +298,7 @@ func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetada func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) { var response *MetadataResponse - err = ca.retryOnError(isErrNoController, func() error { + err = ca.retryOnError(isErrNotController, func() error { controller, err := ca.Controller() if err != nil { return err @@ -306,7 +306,7 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32 request := NewMetadataRequest(ca.conf.Version, nil) response, err = controller.GetMetadata(request) - if isErrNoController(err) { + if isErrNotController(err) { _, _ = ca.refreshController() } return err @@ -438,7 +438,7 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error { request.Version = 1 } - return ca.retryOnError(isErrNoController, func() error { + return ca.retryOnError(isErrNotController, func() error { b, err := ca.Controller() if err != nil { return err @@ -482,7 +482,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [ request.Version = 1 } - return ca.retryOnError(isErrNoController, func() error { + return ca.retryOnError(isErrNotController, func() error { b, err := ca.Controller() if err != nil { return err @@ -523,7 +523,7 @@ func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][ request.AddBlock(topic, int32(i), assignment[i]) } - return ca.retryOnError(isErrNoController, func() error { + return ca.retryOnError(isErrNotController, func() error { b, err := ca.Controller() if err != nil { return err @@ -570,7 +570,7 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in request.AddBlock(topic, partitions) var rsp *ListPartitionReassignmentsResponse - err = ca.retryOnError(isErrNoController, func() error { + err = ca.retryOnError(isErrNotController, func() error { b, err := ca.Controller() if err != nil { return err @@ -578,7 +578,7 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in _ = b.Open(ca.client.Config()) rsp, err = b.ListPartitionReassignments(request) - if isErrNoController(err) { + if isErrNotController(err) { _, _ = ca.refreshController() } return err @@ -1204,7 +1204,7 @@ func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsU } var rsp *AlterUserScramCredentialsResponse - err := ca.retryOnError(isErrNoController, func() error { + err := ca.retryOnError(isErrNotController, func() error { b, err := ca.Controller() if err != nil { return err