Skip to content

Commit

Permalink
fix(proto): correct consumer metadata shim
Browse files Browse the repository at this point in the history
This is really just proxying to
FindCoordinatorRequest/FindCoordinatorResponse, but for now just copy in
the same isValidVersion/requiredVersion code and ensure we're passing
Version to and from it correctly.

Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Aug 2, 2023
1 parent 6039dab commit 15e0ebd
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
12 changes: 10 additions & 2 deletions consumer_metadata_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ func (r *ConsumerMetadataRequest) encode(pe packetEncoder) error {
tmp := new(FindCoordinatorRequest)
tmp.CoordinatorKey = r.ConsumerGroup
tmp.CoordinatorType = CoordinatorGroup
tmp.Version = r.Version
return tmp.encode(pe)
}

Expand All @@ -35,9 +36,16 @@ func (r *ConsumerMetadataRequest) headerVersion() int16 {
}

func (r *ConsumerMetadataRequest) isValidVersion() bool {
return r.Version == 0
return r.Version >= 0 && r.Version <= 2
}

func (r *ConsumerMetadataRequest) requiredVersion() KafkaVersion {
return V0_8_2_0
switch r.Version {
case 2:
return V2_0_0_0
case 1:
return V0_11_0_0
default:
return V0_8_2_0
}
}
13 changes: 10 additions & 3 deletions consumer_metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
}

tmp := &FindCoordinatorResponse{
Version: 0,
Version: r.Version,
Err: r.Err,
Coordinator: r.Coordinator,
}
Expand All @@ -79,9 +79,16 @@ func (r *ConsumerMetadataResponse) headerVersion() int16 {
}

func (r *ConsumerMetadataResponse) isValidVersion() bool {
return r.Version == 0
return r.Version >= 0 && r.Version <= 2
}

func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion {
return V0_8_2_0
switch r.Version {
case 2:
return V2_0_0_0
case 1:
return V0_11_0_0
default:
return V0_8_2_0
}
}

0 comments on commit 15e0ebd

Please sign in to comment.