diff --git a/admin.go b/admin.go index 21fa10cc9..7dabd3737 100644 --- a/admin.go +++ b/admin.go @@ -196,9 +196,9 @@ func (ca *clusterAdmin) refreshController() (*Broker, error) { return ca.client.RefreshController() } -// isErrNoController returns `true` if the given error type unwraps to an +// isErrNotController returns `true` if the given error type unwraps to an // `ErrNotController` response from Kafka -func isErrNoController(err error) bool { +func isErrNotController(err error) bool { return errors.Is(err, ErrNotController) } @@ -249,7 +249,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO request.Version = 1 } - return ca.retryOnError(isErrNoController, func() error { + return ca.retryOnError(isErrNotController, func() error { b, err := ca.Controller() if err != nil { return err @@ -278,14 +278,14 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) { var response *MetadataResponse - err = ca.retryOnError(isErrNoController, func() error { + err = ca.retryOnError(isErrNotController, func() error { controller, err := ca.Controller() if err != nil { return err } request := NewMetadataRequest(ca.conf.Version, topics) response, err = controller.GetMetadata(request) - if isErrNoController(err) { + if isErrNotController(err) { _, _ = ca.refreshController() } return err @@ -298,7 +298,7 @@ func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetada func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) { var response *MetadataResponse - err = ca.retryOnError(isErrNoController, func() error { + err = ca.retryOnError(isErrNotController, func() error { controller, err := ca.Controller() if err != nil { return err @@ -306,7 +306,7 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32 request := NewMetadataRequest(ca.conf.Version, nil) response, err = controller.GetMetadata(request) - if isErrNoController(err) { + if isErrNotController(err) { _, _ = ca.refreshController() } return err @@ -438,7 +438,7 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error { request.Version = 1 } - return ca.retryOnError(isErrNoController, func() error { + return ca.retryOnError(isErrNotController, func() error { b, err := ca.Controller() if err != nil { return err @@ -482,7 +482,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [ request.Version = 1 } - return ca.retryOnError(isErrNoController, func() error { + return ca.retryOnError(isErrNotController, func() error { b, err := ca.Controller() if err != nil { return err @@ -523,7 +523,7 @@ func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][ request.AddBlock(topic, int32(i), assignment[i]) } - return ca.retryOnError(isErrNoController, func() error { + return ca.retryOnError(isErrNotController, func() error { b, err := ca.Controller() if err != nil { return err @@ -570,7 +570,7 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in request.AddBlock(topic, partitions) var rsp *ListPartitionReassignmentsResponse - err = ca.retryOnError(isErrNoController, func() error { + err = ca.retryOnError(isErrNotController, func() error { b, err := ca.Controller() if err != nil { return err @@ -578,7 +578,7 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in _ = b.Open(ca.client.Config()) rsp, err = b.ListPartitionReassignments(request) - if isErrNoController(err) { + if isErrNotController(err) { _, _ = ca.refreshController() } return err @@ -1203,12 +1203,16 @@ func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsU Upsertions: u, } - b, err := ca.Controller() - if err != nil { - return nil, err - } + var rsp *AlterUserScramCredentialsResponse + err := ca.retryOnError(isErrNotController, func() error { + b, err := ca.Controller() + if err != nil { + return err + } - rsp, err := b.AlterUserScramCredentials(req) + rsp, err = b.AlterUserScramCredentials(req) + return err + }) if err != nil { return nil, err }