Skip to content

Commit

Permalink
fix(producer): use newer ProduceReq as appropriate
Browse files Browse the repository at this point in the history
Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Aug 3, 2023
1 parent 017083e commit 2f4bbaf
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 10 deletions.
20 changes: 13 additions & 7 deletions produce_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,22 @@ func (r *ProduceRequest) isValidVersion() bool {

func (r *ProduceRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_11_0_0
case 7:
return V2_1_0_0
case 6:
return V2_0_0_0
case 4, 5:
return V1_0_0_0
case 3:
return V0_11_0_0
case 2:
return V0_10_0_0
case 1:
return V0_9_0_0
case 0:
return V0_8_2_0
default:
return MinVersion
return V2_1_0_0
}
}

Expand Down
19 changes: 18 additions & 1 deletion produce_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,24 @@ func (r *ProduceResponse) isValidVersion() bool {
}

func (r *ProduceResponse) requiredVersion() KafkaVersion {
return MinVersion
switch r.Version {
case 7:
return V2_1_0_0
case 6:
return V2_0_0_0
case 4, 5:
return V1_0_0_0
case 3:
return V0_11_0_0
case 2:
return V0_10_0_0
case 1:
return V0_9_0_0
case 0:
return V0_8_2_0
default:
return V2_1_0_0
}
}

func (r *ProduceResponse) throttleTime() time.Duration {
Expand Down
9 changes: 7 additions & 2 deletions produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,13 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
req.TransactionalID = &ps.parent.conf.Producer.Transaction.ID
}
}

if ps.parent.conf.Producer.Compression == CompressionZSTD && ps.parent.conf.Version.IsAtLeast(V2_1_0_0) {
if ps.parent.conf.Version.IsAtLeast(V1_0_0_0) {
req.Version = 5
}
if ps.parent.conf.Version.IsAtLeast(V2_0_0_0) {
req.Version = 6
}
if ps.parent.conf.Version.IsAtLeast(V2_1_0_0) {
req.Version = 7
}

Expand Down

0 comments on commit 2f4bbaf

Please sign in to comment.