From a9126add225283b6f1f33291cf5e9e97a7da67e1 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 2 Aug 2023 22:22:30 +0100 Subject: [PATCH] fix(proto): use DeleteRecordsRequest v1 Signed-off-by: Dominic Evans --- admin.go | 3 +++ delete_records_request.go | 9 +++++++-- delete_records_response.go | 9 +++++++-- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/admin.go b/admin.go index ef5f481a4..93ab7b863 100644 --- a/admin.go +++ b/admin.go @@ -605,6 +605,9 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i Topics: topics, Timeout: ca.conf.Admin.Timeout, } + if ca.conf.Version.IsAtLeast(V2_0_0_0) { + request.Version = 1 + } rsp, err := broker.DeleteRecords(request) if err != nil { errs = append(errs, err) diff --git a/delete_records_request.go b/delete_records_request.go index 687b042ad..3ca2146af 100644 --- a/delete_records_request.go +++ b/delete_records_request.go @@ -83,11 +83,16 @@ func (d *DeleteRecordsRequest) headerVersion() int16 { } func (d *DeleteRecordsRequest) isValidVersion() bool { - return d.Version == 0 + return d.Version >= 0 && d.Version <= 1 } func (d *DeleteRecordsRequest) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch d.Version { + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } type DeleteRecordsRequestTopic struct { diff --git a/delete_records_response.go b/delete_records_response.go index cb287524c..2d7db885b 100644 --- a/delete_records_response.go +++ b/delete_records_response.go @@ -85,11 +85,16 @@ func (d *DeleteRecordsResponse) headerVersion() int16 { } func (d *DeleteRecordsResponse) isValidVersion() bool { - return d.Version == 0 + return d.Version >= 0 && d.Version <= 1 } func (d *DeleteRecordsResponse) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch d.Version { + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } func (r *DeleteRecordsResponse) throttleTime() time.Duration {