diff --git a/consumer_metadata_request.go b/consumer_metadata_request.go index 3bef616d2..ef6b9e721 100644 --- a/consumer_metadata_request.go +++ b/consumer_metadata_request.go @@ -10,6 +10,7 @@ func (r *ConsumerMetadataRequest) encode(pe packetEncoder) error { tmp := new(FindCoordinatorRequest) tmp.CoordinatorKey = r.ConsumerGroup tmp.CoordinatorType = CoordinatorGroup + tmp.Version = r.Version return tmp.encode(pe) } @@ -35,9 +36,16 @@ func (r *ConsumerMetadataRequest) headerVersion() int16 { } func (r *ConsumerMetadataRequest) isValidVersion() bool { - return r.Version == 0 + return r.Version >= 0 && r.Version <= 2 } func (r *ConsumerMetadataRequest) requiredVersion() KafkaVersion { - return V0_8_2_0 + switch r.Version { + case 2: + return V2_0_0_0 + case 1: + return V0_11_0_0 + default: + return V0_8_2_0 + } } diff --git a/consumer_metadata_response.go b/consumer_metadata_response.go index a8e95b2ce..d99209e3b 100644 --- a/consumer_metadata_response.go +++ b/consumer_metadata_response.go @@ -54,7 +54,7 @@ func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error { } tmp := &FindCoordinatorResponse{ - Version: 0, + Version: r.Version, Err: r.Err, Coordinator: r.Coordinator, } @@ -79,9 +79,16 @@ func (r *ConsumerMetadataResponse) headerVersion() int16 { } func (r *ConsumerMetadataResponse) isValidVersion() bool { - return r.Version == 0 + return r.Version >= 0 && r.Version <= 2 } func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion { - return V0_8_2_0 + switch r.Version { + case 2: + return V2_0_0_0 + case 1: + return V0_11_0_0 + default: + return V0_8_2_0 + } }