Skip to content

Commit

Permalink
chore(proto): permit CreatePartitionsRequest V1
Browse files Browse the repository at this point in the history
This is identical in format to V0, but just influences the broker's
response behaviour:
> starting in version 1, on quota violation, brokers send out responses
> before throttling.

Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Aug 3, 2023
1 parent 1532d9f commit c32ffd1
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 4 deletions.
3 changes: 3 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,9 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
Timeout: ca.conf.Admin.Timeout,
ValidateOnly: validateOnly,
}
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 1
}

return ca.retryOnError(isErrNoController, func() error {
b, err := ca.Controller()
Expand Down
11 changes: 9 additions & 2 deletions create_partitions_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,18 @@ func (r *CreatePartitionsRequest) headerVersion() int16 {
}

func (r *CreatePartitionsRequest) isValidVersion() bool {
return r.Version == 0
return r.Version >= 0 && r.Version <= 1
}

func (r *CreatePartitionsRequest) requiredVersion() KafkaVersion {
return V1_0_0_0
switch r.Version {
case 1:
return V2_0_0_0
case 0:
return V1_0_0_0
default:
return V2_0_0_0
}
}

type TopicPartition struct {
Expand Down
11 changes: 9 additions & 2 deletions create_partitions_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,18 @@ func (r *CreatePartitionsResponse) headerVersion() int16 {
}

func (r *CreatePartitionsResponse) isValidVersion() bool {
return r.Version == 0
return r.Version >= 0 && r.Version <= 1
}

func (r *CreatePartitionsResponse) requiredVersion() KafkaVersion {
return V1_0_0_0
switch r.Version {
case 1:
return V2_0_0_0
case 0:
return V1_0_0_0
default:
return V2_0_0_0
}
}

func (r *CreatePartitionsResponse) throttleTime() time.Duration {
Expand Down

0 comments on commit c32ffd1

Please sign in to comment.