Skip to content

Commit

Permalink
fix(proto): use DeleteRecordsRequest v1
Browse files Browse the repository at this point in the history
Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Aug 2, 2023
1 parent b8cc2b1 commit a9126ad
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 4 deletions.
3 changes: 3 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,9 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i
Topics: topics,
Timeout: ca.conf.Admin.Timeout,
}
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 1
}
rsp, err := broker.DeleteRecords(request)
if err != nil {
errs = append(errs, err)
Expand Down
9 changes: 7 additions & 2 deletions delete_records_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,16 @@ func (d *DeleteRecordsRequest) headerVersion() int16 {
}

func (d *DeleteRecordsRequest) isValidVersion() bool {
return d.Version == 0
return d.Version >= 0 && d.Version <= 1
}

func (d *DeleteRecordsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
switch d.Version {
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}

type DeleteRecordsRequestTopic struct {
Expand Down
9 changes: 7 additions & 2 deletions delete_records_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,16 @@ func (d *DeleteRecordsResponse) headerVersion() int16 {
}

func (d *DeleteRecordsResponse) isValidVersion() bool {
return d.Version == 0
return d.Version >= 0 && d.Version <= 1
}

func (d *DeleteRecordsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
switch d.Version {
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}

func (r *DeleteRecordsResponse) throttleTime() time.Duration {
Expand Down

0 comments on commit a9126ad

Please sign in to comment.