Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add isValidVersion to protocol types #2538

Merged
merged 8 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions acl_create_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (c *CreateAclsRequest) headerVersion() int16 {
return 1
}

func (c *CreateAclsRequest) isValidVersion() bool {
return c.Version >= 0 && c.Version <= 1
}

func (c *CreateAclsRequest) requiredVersion() KafkaVersion {
switch c.Version {
case 1:
Expand Down
14 changes: 12 additions & 2 deletions acl_create_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "time"

// CreateAclsResponse is a an acl response creation type
type CreateAclsResponse struct {
Version int16
ThrottleTime time.Duration
AclCreationResponses []*AclCreationResponse
}
Expand Down Expand Up @@ -52,15 +53,24 @@ func (c *CreateAclsResponse) key() int16 {
}

func (c *CreateAclsResponse) version() int16 {
return 0
return c.Version
}

func (c *CreateAclsResponse) headerVersion() int16 {
return 0
}

func (c *CreateAclsResponse) isValidVersion() bool {
return c.Version >= 0 && c.Version <= 1
}

func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
switch c.Version {
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}

func (r *CreateAclsResponse) throttleTime() time.Duration {
Expand Down
4 changes: 4 additions & 0 deletions acl_delete_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (d *DeleteAclsRequest) headerVersion() int16 {
return 1
}

func (d *DeleteAclsRequest) isValidVersion() bool {
return d.Version >= 0 && d.Version <= 1
}

func (d *DeleteAclsRequest) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
11 changes: 10 additions & 1 deletion acl_delete_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,17 @@ func (d *DeleteAclsResponse) headerVersion() int16 {
return 0
}

func (d *DeleteAclsResponse) isValidVersion() bool {
return d.Version >= 0 && d.Version <= 1
}

func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
switch d.Version {
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}

func (r *DeleteAclsResponse) throttleTime() time.Duration {
Expand Down
4 changes: 4 additions & 0 deletions acl_describe_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (d *DescribeAclsRequest) headerVersion() int16 {
return 1
}

func (d *DescribeAclsRequest) isValidVersion() bool {
return d.Version >= 0 && d.Version <= 1
}

func (d *DescribeAclsRequest) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions acl_describe_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (d *DescribeAclsResponse) headerVersion() int16 {
return 0
}

func (d *DescribeAclsResponse) isValidVersion() bool {
return d.Version >= 0 && d.Version <= 1
}

func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
16 changes: 14 additions & 2 deletions add_offsets_to_txn_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

// AddOffsetsToTxnRequest adds offsets to a transaction request
type AddOffsetsToTxnRequest struct {
Version int16
TransactionalID string
ProducerID int64
ProducerEpoch int16
Expand Down Expand Up @@ -45,13 +46,24 @@ func (a *AddOffsetsToTxnRequest) key() int16 {
}

func (a *AddOffsetsToTxnRequest) version() int16 {
return 0
return a.Version
}

func (a *AddOffsetsToTxnRequest) headerVersion() int16 {
return 1
}

func (a *AddOffsetsToTxnRequest) isValidVersion() bool {
return a.Version >= 0 && a.Version <= 2
}

func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
switch a.Version {
case 2:
return V2_7_0_0
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}
16 changes: 14 additions & 2 deletions add_offsets_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

// AddOffsetsToTxnResponse is a response type for adding offsets to txns
type AddOffsetsToTxnResponse struct {
Version int16
ThrottleTime time.Duration
Err KError
}
Expand Down Expand Up @@ -37,15 +38,26 @@ func (a *AddOffsetsToTxnResponse) key() int16 {
}

func (a *AddOffsetsToTxnResponse) version() int16 {
return 0
return a.Version
}

func (a *AddOffsetsToTxnResponse) headerVersion() int16 {
return 0
}

func (a *AddOffsetsToTxnResponse) isValidVersion() bool {
return a.Version >= 0 && a.Version <= 2
}

func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
switch a.Version {
case 2:
return V2_7_0_0
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}

func (r *AddOffsetsToTxnResponse) throttleTime() time.Duration {
Expand Down
16 changes: 14 additions & 2 deletions add_partitions_to_txn_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

// AddPartitionsToTxnRequest is a add partition request
type AddPartitionsToTxnRequest struct {
Version int16
TransactionalID string
ProducerID int64
ProducerEpoch int16
Expand Down Expand Up @@ -69,13 +70,24 @@ func (a *AddPartitionsToTxnRequest) key() int16 {
}

func (a *AddPartitionsToTxnRequest) version() int16 {
return 0
return a.Version
}

func (a *AddPartitionsToTxnRequest) headerVersion() int16 {
return 1
}

func (a *AddPartitionsToTxnRequest) isValidVersion() bool {
return a.Version >= 0 && a.Version <= 2
}

func (a *AddPartitionsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
switch a.Version {
case 2:
return V2_7_0_0
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}
16 changes: 14 additions & 2 deletions add_partitions_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

// AddPartitionsToTxnResponse is a partition errors to transaction type
type AddPartitionsToTxnResponse struct {
Version int16
ThrottleTime time.Duration
Errors map[string][]*PartitionError
}
Expand Down Expand Up @@ -76,15 +77,26 @@ func (a *AddPartitionsToTxnResponse) key() int16 {
}

func (a *AddPartitionsToTxnResponse) version() int16 {
return 0
return a.Version
}

func (a *AddPartitionsToTxnResponse) headerVersion() int16 {
return 0
}

func (a *AddPartitionsToTxnResponse) isValidVersion() bool {
return a.Version >= 0 && a.Version <= 2
}

func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
switch a.Version {
case 2:
return V2_7_0_0
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}

func (r *AddPartitionsToTxnResponse) throttleTime() time.Duration {
Expand Down
13 changes: 12 additions & 1 deletion admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,9 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i
Topics: topics,
Timeout: ca.conf.Admin.Timeout,
}
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 1
}
rsp, err := broker.DeleteRecords(request)
if err != nil {
errs = append(errs, err)
Expand Down Expand Up @@ -1061,7 +1064,11 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32
defer wg.Done()
_ = b.Open(conf) // Ensure that broker is opened

response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
request := &DescribeLogDirsRequest{}
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 1
}
response, err := b.DescribeLogDirs(request)
if err != nil {
errChan <- err
return
Expand Down Expand Up @@ -1208,6 +1215,10 @@ func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op Clie
}

func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) {
if !ca.conf.Version.IsAtLeast(V2_4_0_0) {
return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0")
}

controller, err := ca.client.Coordinator(groupId)
if err != nil {
return nil, err
Expand Down
7 changes: 6 additions & 1 deletion alter_client_quotas_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package sarama
// validate_only => BOOLEAN

type AlterClientQuotasRequest struct {
Version int16
Entries []AlterClientQuotasEntry // The quota configuration entries to alter.
ValidateOnly bool // Whether the alteration should be validated, but not performed.
}
Expand Down Expand Up @@ -182,13 +183,17 @@ func (a *AlterClientQuotasRequest) key() int16 {
}

func (a *AlterClientQuotasRequest) version() int16 {
return 0
return a.Version
}

func (a *AlterClientQuotasRequest) headerVersion() int16 {
return 1
}

func (a *AlterClientQuotasRequest) isValidVersion() bool {
return a.Version == 0
}

func (a *AlterClientQuotasRequest) requiredVersion() KafkaVersion {
return V2_6_0_0
}
7 changes: 6 additions & 1 deletion alter_client_quotas_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// entity_name => NULLABLE_STRING

type AlterClientQuotasResponse struct {
Version int16
ThrottleTime time.Duration // The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
Entries []AlterClientQuotasEntryResponse // The quota configuration entries altered.
}
Expand Down Expand Up @@ -133,13 +134,17 @@ func (a *AlterClientQuotasResponse) key() int16 {
}

func (a *AlterClientQuotasResponse) version() int16 {
return 0
return a.Version
}

func (a *AlterClientQuotasResponse) headerVersion() int16 {
return 0
}

func (a *AlterClientQuotasResponse) isValidVersion() bool {
return a.Version == 0
}

func (a *AlterClientQuotasResponse) requiredVersion() KafkaVersion {
return V2_6_0_0
}
Expand Down
7 changes: 6 additions & 1 deletion alter_configs_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

// AlterConfigsRequest is an alter config request type
type AlterConfigsRequest struct {
Version int16
Resources []*AlterConfigsResource
ValidateOnly bool
}
Expand Down Expand Up @@ -114,13 +115,17 @@ func (a *AlterConfigsRequest) key() int16 {
}

func (a *AlterConfigsRequest) version() int16 {
return 0
return a.Version
}

func (a *AlterConfigsRequest) headerVersion() int16 {
return 1
}

func (a *AlterConfigsRequest) isValidVersion() bool {
return a.Version == 0
Comment on lines +125 to +126
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should support version 1 as well since the request is unchanged and response says:

Starting in version 1, on quota violation brokers send out responses before throttling.

}

func (a *AlterConfigsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
9 changes: 7 additions & 2 deletions alter_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "time"

// AlterConfigsResponse is a response type for alter config
type AlterConfigsResponse struct {
Version int16
ThrottleTime time.Duration
Resources []*AlterConfigsResourceResponse
}
Expand Down Expand Up @@ -100,17 +101,21 @@ func (a *AlterConfigsResourceResponse) decode(pd packetDecoder, version int16) e
}

func (a *AlterConfigsResponse) key() int16 {
return 32
return 33
}

func (a *AlterConfigsResponse) version() int16 {
return 0
return a.Version
}

func (a *AlterConfigsResponse) headerVersion() int16 {
return 0
}

func (a *AlterConfigsResponse) isValidVersion() bool {
return a.Version == 0
}

func (a *AlterConfigsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions alter_partition_reassignments_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func (r *AlterPartitionReassignmentsRequest) headerVersion() int16 {
return 2
}

func (r *AlterPartitionReassignmentsRequest) isValidVersion() bool {
return r.Version == 0
}

func (r *AlterPartitionReassignmentsRequest) requiredVersion() KafkaVersion {
return V2_4_0_0
}
Expand Down
Loading