From e8523df2398483603f5da97ab66a9de7e5205dd2 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Thu, 3 Aug 2023 10:22:48 +0100 Subject: [PATCH] fix(producer): use newer ProduceReq as appropriate --- produce_request.go | 22 ++++++++++++++-------- produce_response.go | 19 ++++++++++++++++++- produce_set.go | 9 +++++++-- 3 files changed, 39 insertions(+), 11 deletions(-) diff --git a/produce_request.go b/produce_request.go index aed2275332..b1f7ccee14 100644 --- a/produce_request.go +++ b/produce_request.go @@ -218,16 +218,22 @@ func (r *ProduceRequest) isValidVersion() bool { func (r *ProduceRequest) requiredVersion() KafkaVersion { switch r.Version { - case 1: - return V0_9_0_0 - case 2: - return V0_10_0_0 - case 3: - return V0_11_0_0 - case 7: + case 7: return V2_1_0_0 + case 6: + return V2_0_0_0 + case 4, 5: + return V1_0_0_0 + case 3: + return V0_11_0_0 + case 2: + return V0_10_0_0 + case 1: + return V0_9_0_0 + case 0: + return V0_8_2_0 default: - return MinVersion + return V2_1_0_0 } } diff --git a/produce_response.go b/produce_response.go index 37ad8db2dc..8845abbd40 100644 --- a/produce_response.go +++ b/produce_response.go @@ -180,7 +180,24 @@ func (r *ProduceResponse) isValidVersion() bool { } func (r *ProduceResponse) requiredVersion() KafkaVersion { - return MinVersion + switch r.Version { + case 7: + return V2_1_0_0 + case 6: + return V2_0_0_0 + case 4, 5: + return V1_0_0_0 + case 3: + return V0_11_0_0 + case 2: + return V0_10_0_0 + case 1: + return V0_9_0_0 + case 0: + return V0_8_2_0 + default: + return V2_1_0_0 + } } func (r *ProduceResponse) throttleTime() time.Duration { diff --git a/produce_set.go b/produce_set.go index 8d6980479e..004fc64903 100644 --- a/produce_set.go +++ b/produce_set.go @@ -141,8 +141,13 @@ func (ps *produceSet) buildRequest() *ProduceRequest { req.TransactionalID = &ps.parent.conf.Producer.Transaction.ID } } - - if ps.parent.conf.Producer.Compression == CompressionZSTD && ps.parent.conf.Version.IsAtLeast(V2_1_0_0) { + if ps.parent.conf.Version.IsAtLeast(V1_0_0_0) { + req.Version = 5 + } + if ps.parent.conf.Version.IsAtLeast(V2_0_0_0) { + req.Version = 6 + } + if ps.parent.conf.Version.IsAtLeast(V2_1_0_0) { req.Version = 7 }