From 3dfbf99dde82a00646ed08e0bf6b72a0ffe63854 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Tue, 1 Aug 2023 13:31:23 +0100 Subject: [PATCH 1/8] feat(proto): add isValidVersion to protocol types The intention here is that we can check that a request version value is supported by the protocol encoder/decoder before sending it Signed-off-by: Dominic Evans --- acl_create_request.go | 4 ++ acl_create_response.go | 14 ++++- acl_delete_request.go | 4 ++ acl_delete_response.go | 11 +++- acl_describe_request.go | 4 ++ acl_describe_response.go | 4 ++ add_offsets_to_txn_request.go | 7 ++- add_offsets_to_txn_response.go | 7 ++- add_partitions_to_txn_request.go | 7 ++- add_partitions_to_txn_response.go | 7 ++- alter_client_quotas_request.go | 7 ++- alter_client_quotas_response.go | 7 ++- alter_configs_request.go | 7 ++- alter_configs_response.go | 7 ++- alter_partition_reassignments_request.go | 4 ++ alter_partition_reassignments_response.go | 4 ++ alter_user_scram_credentials_request.go | 4 ++ alter_user_scram_credentials_response.go | 4 ++ api_versions_request.go | 4 ++ api_versions_response.go | 4 ++ consumer_metadata_request.go | 7 ++- consumer_metadata_response.go | 7 ++- create_partitions_request.go | 7 ++- create_partitions_response.go | 7 ++- create_topics_request.go | 10 ++- create_topics_response.go | 10 ++- delete_groups_request.go | 9 ++- delete_groups_response.go | 7 ++- delete_offsets_request.go | 7 ++- delete_offsets_response.go | 7 ++- delete_records_request.go | 7 ++- delete_records_response.go | 6 +- delete_topics_request.go | 8 +++ delete_topics_response.go | 8 +++ describe_client_quotas_request.go | 7 ++- describe_client_quotas_response.go | 7 ++- describe_configs_request.go | 4 ++ describe_configs_response.go | 4 ++ describe_groups_request.go | 4 ++ describe_groups_response.go | 4 ++ describe_log_dirs_request.go | 4 ++ describe_log_dirs_response.go | 4 ++ describe_user_scram_credentials_request.go | 4 ++ describe_user_scram_credentials_response.go | 4 ++ end_txn_request.go | 7 ++- end_txn_response.go | 7 ++- fetch_request.go | 4 ++ fetch_response.go | 4 ++ find_coordinator_request.go | 6 ++ find_coordinator_response.go | 6 ++ heartbeat_request.go | 4 ++ heartbeat_response.go | 4 ++ incremental_alter_configs_request.go | 7 ++- incremental_alter_configs_response.go | 7 ++- init_producer_id_request.go | 4 ++ init_producer_id_response.go | 4 ++ join_group_request.go | 4 ++ join_group_response.go | 4 ++ leave_group_request.go | 15 +++-- leave_group_response.go | 4 ++ list_groups_request.go | 19 +++++- list_groups_response.go | 20 ++++-- list_partition_reassignments_request.go | 4 ++ list_partition_reassignments_response.go | 4 ++ metadata_request.go | 4 ++ metadata_response.go | 4 ++ offset_commit_request.go | 4 ++ offset_commit_response.go | 4 ++ offset_fetch_request.go | 4 ++ offset_fetch_response.go | 4 ++ offset_request.go | 14 ++++- offset_response.go | 14 ++++- produce_request.go | 10 ++- produce_response.go | 4 ++ request.go | 67 +++++++++++---------- sasl_authenticate_request.go | 4 ++ sasl_authenticate_response.go | 4 ++ sasl_handshake_request.go | 11 +++- sasl_handshake_response.go | 14 ++++- sync_group_request.go | 4 ++ sync_group_response.go | 4 ++ txn_offset_commit_request.go | 7 ++- txn_offset_commit_response.go | 7 ++- 83 files changed, 513 insertions(+), 87 deletions(-) diff --git a/acl_create_request.go b/acl_create_request.go index 449102f74..e581c984a 100644 --- a/acl_create_request.go +++ b/acl_create_request.go @@ -51,6 +51,10 @@ func (c *CreateAclsRequest) headerVersion() int16 { return 1 } +func (c *CreateAclsRequest) isValidVersion() bool { + return c.Version >= 0 && c.Version <= 1 +} + func (c *CreateAclsRequest) requiredVersion() KafkaVersion { switch c.Version { case 1: diff --git a/acl_create_response.go b/acl_create_response.go index ecfe119ce..d123ba863 100644 --- a/acl_create_response.go +++ b/acl_create_response.go @@ -4,6 +4,7 @@ import "time" // CreateAclsResponse is a an acl response creation type type CreateAclsResponse struct { + Version int16 ThrottleTime time.Duration AclCreationResponses []*AclCreationResponse } @@ -52,15 +53,24 @@ func (c *CreateAclsResponse) key() int16 { } func (c *CreateAclsResponse) version() int16 { - return 0 + return c.Version } func (c *CreateAclsResponse) headerVersion() int16 { return 0 } +func (c *CreateAclsResponse) isValidVersion() bool { + return c.Version >= 0 && c.Version <= 1 +} + func (c *CreateAclsResponse) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch c.Version { + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } func (r *CreateAclsResponse) throttleTime() time.Duration { diff --git a/acl_delete_request.go b/acl_delete_request.go index 5e5c03bc2..abeb4425e 100644 --- a/acl_delete_request.go +++ b/acl_delete_request.go @@ -52,6 +52,10 @@ func (d *DeleteAclsRequest) headerVersion() int16 { return 1 } +func (d *DeleteAclsRequest) isValidVersion() bool { + return d.Version >= 0 && d.Version <= 1 +} + func (d *DeleteAclsRequest) requiredVersion() KafkaVersion { switch d.Version { case 1: diff --git a/acl_delete_response.go b/acl_delete_response.go index 6f9b49c4a..2e2850b32 100644 --- a/acl_delete_response.go +++ b/acl_delete_response.go @@ -60,8 +60,17 @@ func (d *DeleteAclsResponse) headerVersion() int16 { return 0 } +func (d *DeleteAclsResponse) isValidVersion() bool { + return d.Version >= 0 && d.Version <= 1 +} + func (d *DeleteAclsResponse) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch d.Version { + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } func (r *DeleteAclsResponse) throttleTime() time.Duration { diff --git a/acl_describe_request.go b/acl_describe_request.go index 98edb6740..7d65bef14 100644 --- a/acl_describe_request.go +++ b/acl_describe_request.go @@ -29,6 +29,10 @@ func (d *DescribeAclsRequest) headerVersion() int16 { return 1 } +func (d *DescribeAclsRequest) isValidVersion() bool { + return d.Version >= 0 && d.Version <= 1 +} + func (d *DescribeAclsRequest) requiredVersion() KafkaVersion { switch d.Version { case 1: diff --git a/acl_describe_response.go b/acl_describe_response.go index 0b43b5d8d..f89a53b66 100644 --- a/acl_describe_response.go +++ b/acl_describe_response.go @@ -81,6 +81,10 @@ func (d *DescribeAclsResponse) headerVersion() int16 { return 0 } +func (d *DescribeAclsResponse) isValidVersion() bool { + return d.Version >= 0 && d.Version <= 1 +} + func (d *DescribeAclsResponse) requiredVersion() KafkaVersion { switch d.Version { case 1: diff --git a/add_offsets_to_txn_request.go b/add_offsets_to_txn_request.go index a96af9341..fe6f773bb 100644 --- a/add_offsets_to_txn_request.go +++ b/add_offsets_to_txn_request.go @@ -2,6 +2,7 @@ package sarama // AddOffsetsToTxnRequest adds offsets to a transaction request type AddOffsetsToTxnRequest struct { + Version int16 TransactionalID string ProducerID int64 ProducerEpoch int16 @@ -45,13 +46,17 @@ func (a *AddOffsetsToTxnRequest) key() int16 { } func (a *AddOffsetsToTxnRequest) version() int16 { - return 0 + return a.Version } func (a *AddOffsetsToTxnRequest) headerVersion() int16 { return 1 } +func (a *AddOffsetsToTxnRequest) isValidVersion() bool { + return a.Version == 0 +} + func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/add_offsets_to_txn_response.go b/add_offsets_to_txn_response.go index 35d6644c5..8df6f1ca6 100644 --- a/add_offsets_to_txn_response.go +++ b/add_offsets_to_txn_response.go @@ -6,6 +6,7 @@ import ( // AddOffsetsToTxnResponse is a response type for adding offsets to txns type AddOffsetsToTxnResponse struct { + Version int16 ThrottleTime time.Duration Err KError } @@ -37,13 +38,17 @@ func (a *AddOffsetsToTxnResponse) key() int16 { } func (a *AddOffsetsToTxnResponse) version() int16 { - return 0 + return a.Version } func (a *AddOffsetsToTxnResponse) headerVersion() int16 { return 0 } +func (a *AddOffsetsToTxnResponse) isValidVersion() bool { + return a.Version == 0 +} + func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/add_partitions_to_txn_request.go b/add_partitions_to_txn_request.go index 1d6da75f5..61d7b4fde 100644 --- a/add_partitions_to_txn_request.go +++ b/add_partitions_to_txn_request.go @@ -2,6 +2,7 @@ package sarama // AddPartitionsToTxnRequest is a add partition request type AddPartitionsToTxnRequest struct { + Version int16 TransactionalID string ProducerID int64 ProducerEpoch int16 @@ -69,13 +70,17 @@ func (a *AddPartitionsToTxnRequest) key() int16 { } func (a *AddPartitionsToTxnRequest) version() int16 { - return 0 + return a.Version } func (a *AddPartitionsToTxnRequest) headerVersion() int16 { return 1 } +func (a *AddPartitionsToTxnRequest) isValidVersion() bool { + return a.Version == 0 +} + func (a *AddPartitionsToTxnRequest) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/add_partitions_to_txn_response.go b/add_partitions_to_txn_response.go index 4adfaf883..2c38c57d1 100644 --- a/add_partitions_to_txn_response.go +++ b/add_partitions_to_txn_response.go @@ -6,6 +6,7 @@ import ( // AddPartitionsToTxnResponse is a partition errors to transaction type type AddPartitionsToTxnResponse struct { + Version int16 ThrottleTime time.Duration Errors map[string][]*PartitionError } @@ -76,13 +77,17 @@ func (a *AddPartitionsToTxnResponse) key() int16 { } func (a *AddPartitionsToTxnResponse) version() int16 { - return 0 + return a.Version } func (a *AddPartitionsToTxnResponse) headerVersion() int16 { return 0 } +func (a *AddPartitionsToTxnResponse) isValidVersion() bool { + return a.Version == 0 +} + func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/alter_client_quotas_request.go b/alter_client_quotas_request.go index f528512d0..a7fa0cbd1 100644 --- a/alter_client_quotas_request.go +++ b/alter_client_quotas_request.go @@ -12,6 +12,7 @@ package sarama // validate_only => BOOLEAN type AlterClientQuotasRequest struct { + Version int16 Entries []AlterClientQuotasEntry // The quota configuration entries to alter. ValidateOnly bool // Whether the alteration should be validated, but not performed. } @@ -182,13 +183,17 @@ func (a *AlterClientQuotasRequest) key() int16 { } func (a *AlterClientQuotasRequest) version() int16 { - return 0 + return a.Version } func (a *AlterClientQuotasRequest) headerVersion() int16 { return 1 } +func (a *AlterClientQuotasRequest) isValidVersion() bool { + return a.Version == 0 +} + func (a *AlterClientQuotasRequest) requiredVersion() KafkaVersion { return V2_6_0_0 } diff --git a/alter_client_quotas_response.go b/alter_client_quotas_response.go index 4d68e69ed..cce997cae 100644 --- a/alter_client_quotas_response.go +++ b/alter_client_quotas_response.go @@ -14,6 +14,7 @@ import ( // entity_name => NULLABLE_STRING type AlterClientQuotasResponse struct { + Version int16 ThrottleTime time.Duration // The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota. Entries []AlterClientQuotasEntryResponse // The quota configuration entries altered. } @@ -133,13 +134,17 @@ func (a *AlterClientQuotasResponse) key() int16 { } func (a *AlterClientQuotasResponse) version() int16 { - return 0 + return a.Version } func (a *AlterClientQuotasResponse) headerVersion() int16 { return 0 } +func (a *AlterClientQuotasResponse) isValidVersion() bool { + return a.Version == 0 +} + func (a *AlterClientQuotasResponse) requiredVersion() KafkaVersion { return V2_6_0_0 } diff --git a/alter_configs_request.go b/alter_configs_request.go index 8b94b1f3f..cf51beb67 100644 --- a/alter_configs_request.go +++ b/alter_configs_request.go @@ -2,6 +2,7 @@ package sarama // AlterConfigsRequest is an alter config request type type AlterConfigsRequest struct { + Version int16 Resources []*AlterConfigsResource ValidateOnly bool } @@ -114,13 +115,17 @@ func (a *AlterConfigsRequest) key() int16 { } func (a *AlterConfigsRequest) version() int16 { - return 0 + return a.Version } func (a *AlterConfigsRequest) headerVersion() int16 { return 1 } +func (a *AlterConfigsRequest) isValidVersion() bool { + return a.Version == 0 +} + func (a *AlterConfigsRequest) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/alter_configs_response.go b/alter_configs_response.go index 15749a8f8..72ffcc9f7 100644 --- a/alter_configs_response.go +++ b/alter_configs_response.go @@ -4,6 +4,7 @@ import "time" // AlterConfigsResponse is a response type for alter config type AlterConfigsResponse struct { + Version int16 ThrottleTime time.Duration Resources []*AlterConfigsResourceResponse } @@ -104,13 +105,17 @@ func (a *AlterConfigsResponse) key() int16 { } func (a *AlterConfigsResponse) version() int16 { - return 0 + return a.Version } func (a *AlterConfigsResponse) headerVersion() int16 { return 0 } +func (a *AlterConfigsResponse) isValidVersion() bool { + return a.Version == 0 +} + func (a *AlterConfigsResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/alter_partition_reassignments_request.go b/alter_partition_reassignments_request.go index f0a2f9dd5..f898f87a2 100644 --- a/alter_partition_reassignments_request.go +++ b/alter_partition_reassignments_request.go @@ -113,6 +113,10 @@ func (r *AlterPartitionReassignmentsRequest) headerVersion() int16 { return 2 } +func (r *AlterPartitionReassignmentsRequest) isValidVersion() bool { + return r.Version == 0 +} + func (r *AlterPartitionReassignmentsRequest) requiredVersion() KafkaVersion { return V2_4_0_0 } diff --git a/alter_partition_reassignments_response.go b/alter_partition_reassignments_response.go index 0765486ee..1ee56b40e 100644 --- a/alter_partition_reassignments_response.go +++ b/alter_partition_reassignments_response.go @@ -154,6 +154,10 @@ func (r *AlterPartitionReassignmentsResponse) headerVersion() int16 { return 1 } +func (r *AlterPartitionReassignmentsResponse) isValidVersion() bool { + return r.Version == 0 +} + func (r *AlterPartitionReassignmentsResponse) requiredVersion() KafkaVersion { return V2_4_0_0 } diff --git a/alter_user_scram_credentials_request.go b/alter_user_scram_credentials_request.go index 0530d8946..f29f164cf 100644 --- a/alter_user_scram_credentials_request.go +++ b/alter_user_scram_credentials_request.go @@ -137,6 +137,10 @@ func (r *AlterUserScramCredentialsRequest) headerVersion() int16 { return 2 } +func (r *AlterUserScramCredentialsRequest) isValidVersion() bool { + return r.Version == 0 +} + func (r *AlterUserScramCredentialsRequest) requiredVersion() KafkaVersion { return V2_7_0_0 } diff --git a/alter_user_scram_credentials_response.go b/alter_user_scram_credentials_response.go index 018483c9e..75eac0cec 100644 --- a/alter_user_scram_credentials_response.go +++ b/alter_user_scram_credentials_response.go @@ -89,6 +89,10 @@ func (r *AlterUserScramCredentialsResponse) headerVersion() int16 { return 2 } +func (r *AlterUserScramCredentialsResponse) isValidVersion() bool { + return r.Version == 0 +} + func (r *AlterUserScramCredentialsResponse) requiredVersion() KafkaVersion { return V2_7_0_0 } diff --git a/api_versions_request.go b/api_versions_request.go index e5b3baf64..46823537a 100644 --- a/api_versions_request.go +++ b/api_versions_request.go @@ -57,6 +57,10 @@ func (r *ApiVersionsRequest) headerVersion() int16 { return 1 } +func (r *ApiVersionsRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 3 +} + func (r *ApiVersionsRequest) requiredVersion() KafkaVersion { switch r.Version { case 0: diff --git a/api_versions_response.go b/api_versions_response.go index 9643ee1fc..abb669011 100644 --- a/api_versions_response.go +++ b/api_versions_response.go @@ -146,6 +146,10 @@ func (r *ApiVersionsResponse) headerVersion() int16 { return 0 } +func (r *ApiVersionsResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 3 +} + func (r *ApiVersionsResponse) requiredVersion() KafkaVersion { switch r.Version { case 0: diff --git a/consumer_metadata_request.go b/consumer_metadata_request.go index 5c18e048a..3bef616d2 100644 --- a/consumer_metadata_request.go +++ b/consumer_metadata_request.go @@ -2,6 +2,7 @@ package sarama // ConsumerMetadataRequest is used for metadata requests type ConsumerMetadataRequest struct { + Version int16 ConsumerGroup string } @@ -26,13 +27,17 @@ func (r *ConsumerMetadataRequest) key() int16 { } func (r *ConsumerMetadataRequest) version() int16 { - return 0 + return r.Version } func (r *ConsumerMetadataRequest) headerVersion() int16 { return 1 } +func (r *ConsumerMetadataRequest) isValidVersion() bool { + return r.Version == 0 +} + func (r *ConsumerMetadataRequest) requiredVersion() KafkaVersion { return V0_8_2_0 } diff --git a/consumer_metadata_response.go b/consumer_metadata_response.go index 7fe0cf971..a8e95b2ce 100644 --- a/consumer_metadata_response.go +++ b/consumer_metadata_response.go @@ -7,6 +7,7 @@ import ( // ConsumerMetadataResponse holds the response for a consumer group meta data requests type ConsumerMetadataResponse struct { + Version int16 Err KError Coordinator *Broker CoordinatorID int32 // deprecated: use Coordinator.ID() @@ -70,13 +71,17 @@ func (r *ConsumerMetadataResponse) key() int16 { } func (r *ConsumerMetadataResponse) version() int16 { - return 0 + return r.Version } func (r *ConsumerMetadataResponse) headerVersion() int16 { return 0 } +func (r *ConsumerMetadataResponse) isValidVersion() bool { + return r.Version == 0 +} + func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion { return V0_8_2_0 } diff --git a/create_partitions_request.go b/create_partitions_request.go index 46fb04402..68435d639 100644 --- a/create_partitions_request.go +++ b/create_partitions_request.go @@ -3,6 +3,7 @@ package sarama import "time" type CreatePartitionsRequest struct { + Version int16 TopicPartitions map[string]*TopicPartition Timeout time.Duration ValidateOnly bool @@ -64,13 +65,17 @@ func (r *CreatePartitionsRequest) key() int16 { } func (r *CreatePartitionsRequest) version() int16 { - return 0 + return r.Version } func (r *CreatePartitionsRequest) headerVersion() int16 { return 1 } +func (r *CreatePartitionsRequest) isValidVersion() bool { + return r.Version == 0 +} + func (r *CreatePartitionsRequest) requiredVersion() KafkaVersion { return V1_0_0_0 } diff --git a/create_partitions_response.go b/create_partitions_response.go index 2ac2d11a9..b9695ae53 100644 --- a/create_partitions_response.go +++ b/create_partitions_response.go @@ -6,6 +6,7 @@ import ( ) type CreatePartitionsResponse struct { + Version int16 ThrottleTime time.Duration TopicPartitionErrors map[string]*TopicPartitionError } @@ -60,13 +61,17 @@ func (r *CreatePartitionsResponse) key() int16 { } func (r *CreatePartitionsResponse) version() int16 { - return 0 + return r.Version } func (r *CreatePartitionsResponse) headerVersion() int16 { return 0 } +func (r *CreatePartitionsResponse) isValidVersion() bool { + return r.Version == 0 +} + func (r *CreatePartitionsResponse) requiredVersion() KafkaVersion { return V1_0_0_0 } diff --git a/create_topics_request.go b/create_topics_request.go index 287acd069..1a937a803 100644 --- a/create_topics_request.go +++ b/create_topics_request.go @@ -83,12 +83,18 @@ func (r *CreateTopicsRequest) headerVersion() int16 { return 1 } +func (c *CreateTopicsRequest) isValidVersion() bool { + return c.Version >= 0 && c.Version <= 3 +} + func (c *CreateTopicsRequest) requiredVersion() KafkaVersion { switch c.Version { + case 3: + return V2_0_0_0 case 2: - return V1_0_0_0 - case 1: return V0_11_0_0 + case 1: + return V0_10_2_0 default: return V0_10_1_0 } diff --git a/create_topics_response.go b/create_topics_response.go index f3961b7b8..af6433d94 100644 --- a/create_topics_response.go +++ b/create_topics_response.go @@ -74,12 +74,18 @@ func (c *CreateTopicsResponse) headerVersion() int16 { return 0 } +func (c *CreateTopicsResponse) isValidVersion() bool { + return c.Version >= 0 && c.Version <= 3 +} + func (c *CreateTopicsResponse) requiredVersion() KafkaVersion { switch c.Version { + case 3: + return V2_0_0_0 case 2: - return V1_0_0_0 - case 1: return V0_11_0_0 + case 1: + return V0_10_2_0 default: return V0_10_1_0 } diff --git a/delete_groups_request.go b/delete_groups_request.go index 4ac8bbee4..2158370d5 100644 --- a/delete_groups_request.go +++ b/delete_groups_request.go @@ -1,7 +1,8 @@ package sarama type DeleteGroupsRequest struct { - Groups []string + Version int16 + Groups []string } func (r *DeleteGroupsRequest) encode(pe packetEncoder) error { @@ -18,13 +19,17 @@ func (r *DeleteGroupsRequest) key() int16 { } func (r *DeleteGroupsRequest) version() int16 { - return 0 + return r.Version } func (r *DeleteGroupsRequest) headerVersion() int16 { return 1 } +func (r *DeleteGroupsRequest) isValidVersion() bool { + return r.Version == 0 +} + func (r *DeleteGroupsRequest) requiredVersion() KafkaVersion { return V1_1_0_0 } diff --git a/delete_groups_response.go b/delete_groups_response.go index 13d210fca..2d77ed2b2 100644 --- a/delete_groups_response.go +++ b/delete_groups_response.go @@ -5,6 +5,7 @@ import ( ) type DeleteGroupsResponse struct { + Version int16 ThrottleTime time.Duration GroupErrorCodes map[string]KError } @@ -62,13 +63,17 @@ func (r *DeleteGroupsResponse) key() int16 { } func (r *DeleteGroupsResponse) version() int16 { - return 0 + return r.Version } func (r *DeleteGroupsResponse) headerVersion() int16 { return 0 } +func (r *DeleteGroupsResponse) isValidVersion() bool { + return r.Version == 0 +} + func (r *DeleteGroupsResponse) requiredVersion() KafkaVersion { return V1_1_0_0 } diff --git a/delete_offsets_request.go b/delete_offsets_request.go index 339c7857c..06b864d18 100644 --- a/delete_offsets_request.go +++ b/delete_offsets_request.go @@ -1,6 +1,7 @@ package sarama type DeleteOffsetsRequest struct { + Version int16 Group string partitions map[string][]int32 } @@ -72,13 +73,17 @@ func (r *DeleteOffsetsRequest) key() int16 { } func (r *DeleteOffsetsRequest) version() int16 { - return 0 + return r.Version } func (r *DeleteOffsetsRequest) headerVersion() int16 { return 1 } +func (r *DeleteOffsetsRequest) isValidVersion() bool { + return r.Version == 0 +} + func (r *DeleteOffsetsRequest) requiredVersion() KafkaVersion { return V2_4_0_0 } diff --git a/delete_offsets_response.go b/delete_offsets_response.go index 4712423c2..86c6c51f6 100644 --- a/delete_offsets_response.go +++ b/delete_offsets_response.go @@ -5,6 +5,7 @@ import ( ) type DeleteOffsetsResponse struct { + Version int16 // The top-level error code, or 0 if there was no error. ErrorCode KError ThrottleTime time.Duration @@ -100,13 +101,17 @@ func (r *DeleteOffsetsResponse) key() int16 { } func (r *DeleteOffsetsResponse) version() int16 { - return 0 + return r.Version } func (r *DeleteOffsetsResponse) headerVersion() int16 { return 0 } +func (r *DeleteOffsetsResponse) isValidVersion() bool { + return r.Version == 0 +} + func (r *DeleteOffsetsResponse) requiredVersion() KafkaVersion { return V2_4_0_0 } diff --git a/delete_records_request.go b/delete_records_request.go index dc106b17d..687b042ad 100644 --- a/delete_records_request.go +++ b/delete_records_request.go @@ -13,6 +13,7 @@ import ( // id(int32) offset(int64) type DeleteRecordsRequest struct { + Version int16 Topics map[string]*DeleteRecordsRequestTopic Timeout time.Duration } @@ -74,13 +75,17 @@ func (d *DeleteRecordsRequest) key() int16 { } func (d *DeleteRecordsRequest) version() int16 { - return 0 + return d.Version } func (d *DeleteRecordsRequest) headerVersion() int16 { return 1 } +func (d *DeleteRecordsRequest) isValidVersion() bool { + return d.Version == 0 +} + func (d *DeleteRecordsRequest) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/delete_records_response.go b/delete_records_response.go index f501338ae..cb287524c 100644 --- a/delete_records_response.go +++ b/delete_records_response.go @@ -77,13 +77,17 @@ func (d *DeleteRecordsResponse) key() int16 { } func (d *DeleteRecordsResponse) version() int16 { - return 0 + return d.Version } func (d *DeleteRecordsResponse) headerVersion() int16 { return 0 } +func (d *DeleteRecordsResponse) isValidVersion() bool { + return d.Version == 0 +} + func (d *DeleteRecordsResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/delete_topics_request.go b/delete_topics_request.go index ba6780a8e..f8af2c5ba 100644 --- a/delete_topics_request.go +++ b/delete_topics_request.go @@ -42,8 +42,16 @@ func (d *DeleteTopicsRequest) headerVersion() int16 { return 1 } +func (d *DeleteTopicsRequest) isValidVersion() bool { + return d.Version >= 0 && d.Version <= 3 +} + func (d *DeleteTopicsRequest) requiredVersion() KafkaVersion { switch d.Version { + case 3: + return V2_1_0_0 + case 2: + return V2_0_0_0 case 1: return V0_11_0_0 default: diff --git a/delete_topics_response.go b/delete_topics_response.go index efafba5d4..a640f50a6 100644 --- a/delete_topics_response.go +++ b/delete_topics_response.go @@ -72,8 +72,16 @@ func (d *DeleteTopicsResponse) headerVersion() int16 { return 0 } +func (d *DeleteTopicsResponse) isValidVersion() bool { + return d.Version >= 0 && d.Version <= 3 +} + func (d *DeleteTopicsResponse) requiredVersion() KafkaVersion { switch d.Version { + case 3: + return V2_1_0_0 + case 2: + return V2_0_0_0 case 1: return V0_11_0_0 default: diff --git a/describe_client_quotas_request.go b/describe_client_quotas_request.go index 17a82051c..8869145c3 100644 --- a/describe_client_quotas_request.go +++ b/describe_client_quotas_request.go @@ -11,6 +11,7 @@ package sarama // Components: the components to filter on // Strict: whether the filter only includes specified components type DescribeClientQuotasRequest struct { + Version int16 Components []QuotaFilterComponent Strict bool } @@ -129,13 +130,17 @@ func (d *DescribeClientQuotasRequest) key() int16 { } func (d *DescribeClientQuotasRequest) version() int16 { - return 0 + return d.Version } func (d *DescribeClientQuotasRequest) headerVersion() int16 { return 1 } +func (d *DescribeClientQuotasRequest) isValidVersion() bool { + return d.Version == 0 +} + func (d *DescribeClientQuotasRequest) requiredVersion() KafkaVersion { return V2_6_0_0 } diff --git a/describe_client_quotas_response.go b/describe_client_quotas_response.go index 2b1336dc7..e9bf658ad 100644 --- a/describe_client_quotas_response.go +++ b/describe_client_quotas_response.go @@ -17,6 +17,7 @@ import ( // value => FLOAT64 type DescribeClientQuotasResponse struct { + Version int16 ThrottleTime time.Duration // The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota. ErrorCode KError // The error code, or `0` if the quota description succeeded. ErrorMsg *string // The error message, or `null` if the quota description succeeded. @@ -223,13 +224,17 @@ func (d *DescribeClientQuotasResponse) key() int16 { } func (d *DescribeClientQuotasResponse) version() int16 { - return 0 + return d.Version } func (d *DescribeClientQuotasResponse) headerVersion() int16 { return 0 } +func (d *DescribeClientQuotasResponse) isValidVersion() bool { + return d.Version == 0 +} + func (d *DescribeClientQuotasResponse) requiredVersion() KafkaVersion { return V2_6_0_0 } diff --git a/describe_configs_request.go b/describe_configs_request.go index 4c3488031..2d2d906c4 100644 --- a/describe_configs_request.go +++ b/describe_configs_request.go @@ -103,6 +103,10 @@ func (r *DescribeConfigsRequest) headerVersion() int16 { return 1 } +func (r *DescribeConfigsRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 2 +} + func (r *DescribeConfigsRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/describe_configs_response.go b/describe_configs_response.go index 05036fb09..772d06b85 100644 --- a/describe_configs_response.go +++ b/describe_configs_response.go @@ -116,6 +116,10 @@ func (r *DescribeConfigsResponse) headerVersion() int16 { return 0 } +func (r *DescribeConfigsResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 2 +} + func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/describe_groups_request.go b/describe_groups_request.go index fc8e6b588..da051792b 100644 --- a/describe_groups_request.go +++ b/describe_groups_request.go @@ -42,6 +42,10 @@ func (r *DescribeGroupsRequest) headerVersion() int16 { return 1 } +func (r *DescribeGroupsRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 4 +} + func (r *DescribeGroupsRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/describe_groups_response.go b/describe_groups_response.go index 36ccb702f..8ba2f045b 100644 --- a/describe_groups_response.go +++ b/describe_groups_response.go @@ -65,6 +65,10 @@ func (r *DescribeGroupsResponse) headerVersion() int16 { return 0 } +func (r *DescribeGroupsResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 4 +} + func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/describe_log_dirs_request.go b/describe_log_dirs_request.go index c0bf04e04..9dcdf56d2 100644 --- a/describe_log_dirs_request.go +++ b/describe_log_dirs_request.go @@ -82,6 +82,10 @@ func (r *DescribeLogDirsRequest) headerVersion() int16 { return 1 } +func (r *DescribeLogDirsRequest) isValidVersion() bool { + return r.Version == 0 +} + func (r *DescribeLogDirsRequest) requiredVersion() KafkaVersion { return V1_0_0_0 } diff --git a/describe_log_dirs_response.go b/describe_log_dirs_response.go index 47ff20ac9..555b2b1b2 100644 --- a/describe_log_dirs_response.go +++ b/describe_log_dirs_response.go @@ -65,6 +65,10 @@ func (r *DescribeLogDirsResponse) headerVersion() int16 { return 0 } +func (r *DescribeLogDirsResponse) isValidVersion() bool { + return r.Version == 0 +} + func (r *DescribeLogDirsResponse) requiredVersion() KafkaVersion { return V1_0_0_0 } diff --git a/describe_user_scram_credentials_request.go b/describe_user_scram_credentials_request.go index b5b59404b..a6265de5f 100644 --- a/describe_user_scram_credentials_request.go +++ b/describe_user_scram_credentials_request.go @@ -65,6 +65,10 @@ func (r *DescribeUserScramCredentialsRequest) headerVersion() int16 { return 2 } +func (r *DescribeUserScramCredentialsRequest) isValidVersion() bool { + return r.Version == 0 +} + func (r *DescribeUserScramCredentialsRequest) requiredVersion() KafkaVersion { return V2_7_0_0 } diff --git a/describe_user_scram_credentials_response.go b/describe_user_scram_credentials_response.go index a209208fe..a55c3f0ee 100644 --- a/describe_user_scram_credentials_response.go +++ b/describe_user_scram_credentials_response.go @@ -163,6 +163,10 @@ func (r *DescribeUserScramCredentialsResponse) headerVersion() int16 { return 2 } +func (r *DescribeUserScramCredentialsResponse) isValidVersion() bool { + return r.Version == 0 +} + func (r *DescribeUserScramCredentialsResponse) requiredVersion() KafkaVersion { return V2_7_0_0 } diff --git a/end_txn_request.go b/end_txn_request.go index 6635425dd..e0e3872c6 100644 --- a/end_txn_request.go +++ b/end_txn_request.go @@ -1,6 +1,7 @@ package sarama type EndTxnRequest struct { + Version int16 TransactionalID string ProducerID int64 ProducerEpoch int16 @@ -42,13 +43,17 @@ func (a *EndTxnRequest) key() int16 { } func (a *EndTxnRequest) version() int16 { - return 0 + return a.Version } func (r *EndTxnRequest) headerVersion() int16 { return 1 } +func (a *EndTxnRequest) isValidVersion() bool { + return a.Version == 0 +} + func (a *EndTxnRequest) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/end_txn_response.go b/end_txn_response.go index a3a28011b..7fcfb0406 100644 --- a/end_txn_response.go +++ b/end_txn_response.go @@ -5,6 +5,7 @@ import ( ) type EndTxnResponse struct { + Version int16 ThrottleTime time.Duration Err KError } @@ -36,13 +37,17 @@ func (e *EndTxnResponse) key() int16 { } func (e *EndTxnResponse) version() int16 { - return 0 + return e.Version } func (r *EndTxnResponse) headerVersion() int16 { return 0 } +func (e *EndTxnResponse) isValidVersion() bool { + return e.Version == 0 +} + func (e *EndTxnResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/fetch_request.go b/fetch_request.go index 26adead4e..f96415b99 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -275,6 +275,10 @@ func (r *FetchRequest) headerVersion() int16 { return 1 } +func (r *FetchRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 11 +} + func (r *FetchRequest) requiredVersion() KafkaVersion { switch r.Version { case 0: diff --git a/fetch_response.go b/fetch_response.go index e1700b7b8..7790b7ddd 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -386,6 +386,10 @@ func (r *FetchResponse) headerVersion() int16 { return 0 } +func (r *FetchResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 11 +} + func (r *FetchResponse) requiredVersion() KafkaVersion { switch r.Version { case 0: diff --git a/find_coordinator_request.go b/find_coordinator_request.go index 597bcbf78..4758835a1 100644 --- a/find_coordinator_request.go +++ b/find_coordinator_request.go @@ -55,8 +55,14 @@ func (r *FindCoordinatorRequest) headerVersion() int16 { return 1 } +func (f *FindCoordinatorRequest) isValidVersion() bool { + return f.Version >= 0 && f.Version <= 2 +} + func (f *FindCoordinatorRequest) requiredVersion() KafkaVersion { switch f.Version { + case 2: + return V2_0_0_0 case 1: return V0_11_0_0 default: diff --git a/find_coordinator_response.go b/find_coordinator_response.go index 68cbcbebe..11b9920d0 100644 --- a/find_coordinator_response.go +++ b/find_coordinator_response.go @@ -86,8 +86,14 @@ func (r *FindCoordinatorResponse) headerVersion() int16 { return 0 } +func (f *FindCoordinatorResponse) isValidVersion() bool { + return f.Version >= 0 && f.Version <= 2 +} + func (f *FindCoordinatorResponse) requiredVersion() KafkaVersion { switch f.Version { + case 2: + return V2_0_0_0 case 1: return V0_11_0_0 default: diff --git a/heartbeat_request.go b/heartbeat_request.go index 511910e71..77a19f816 100644 --- a/heartbeat_request.go +++ b/heartbeat_request.go @@ -60,6 +60,10 @@ func (r *HeartbeatRequest) headerVersion() int16 { return 1 } +func (r *HeartbeatRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 3 +} + func (r *HeartbeatRequest) requiredVersion() KafkaVersion { switch { case r.Version >= 3: diff --git a/heartbeat_response.go b/heartbeat_response.go index d0a6a2eff..d6dd07102 100644 --- a/heartbeat_response.go +++ b/heartbeat_response.go @@ -45,6 +45,10 @@ func (r *HeartbeatResponse) headerVersion() int16 { return 0 } +func (r *HeartbeatResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 3 +} + func (r *HeartbeatResponse) requiredVersion() KafkaVersion { switch r.Version { case 1, 2, 3: diff --git a/incremental_alter_configs_request.go b/incremental_alter_configs_request.go index c4d05a972..b1b490a28 100644 --- a/incremental_alter_configs_request.go +++ b/incremental_alter_configs_request.go @@ -11,6 +11,7 @@ const ( // IncrementalAlterConfigsRequest is an incremental alter config request type type IncrementalAlterConfigsRequest struct { + Version int16 Resources []*IncrementalAlterConfigsResource ValidateOnly bool } @@ -161,13 +162,17 @@ func (a *IncrementalAlterConfigsRequest) key() int16 { } func (a *IncrementalAlterConfigsRequest) version() int16 { - return 0 + return a.Version } func (a *IncrementalAlterConfigsRequest) headerVersion() int16 { return 1 } +func (a *IncrementalAlterConfigsRequest) isValidVersion() bool { + return a.Version == 0 +} + func (a *IncrementalAlterConfigsRequest) requiredVersion() KafkaVersion { return V2_3_0_0 } diff --git a/incremental_alter_configs_response.go b/incremental_alter_configs_response.go index 2320ed287..3a2df2f60 100644 --- a/incremental_alter_configs_response.go +++ b/incremental_alter_configs_response.go @@ -4,6 +4,7 @@ import "time" // IncrementalAlterConfigsResponse is a response type for incremental alter config type IncrementalAlterConfigsResponse struct { + Version int16 ThrottleTime time.Duration Resources []*AlterConfigsResourceResponse } @@ -54,13 +55,17 @@ func (a *IncrementalAlterConfigsResponse) key() int16 { } func (a *IncrementalAlterConfigsResponse) version() int16 { - return 0 + return a.Version } func (a *IncrementalAlterConfigsResponse) headerVersion() int16 { return 0 } +func (a *IncrementalAlterConfigsResponse) isValidVersion() bool { + return a.Version == 0 +} + func (a *IncrementalAlterConfigsResponse) requiredVersion() KafkaVersion { return V2_3_0_0 } diff --git a/init_producer_id_request.go b/init_producer_id_request.go index 33ce5fa41..ff3f4de78 100644 --- a/init_producer_id_request.go +++ b/init_producer_id_request.go @@ -84,6 +84,10 @@ func (i *InitProducerIDRequest) headerVersion() int16 { return 1 } +func (i *InitProducerIDRequest) isValidVersion() bool { + return i.Version >= 0 && i.Version <= 3 +} + func (i *InitProducerIDRequest) requiredVersion() KafkaVersion { switch i.Version { case 2: diff --git a/init_producer_id_response.go b/init_producer_id_response.go index e22580922..22e56e80c 100644 --- a/init_producer_id_response.go +++ b/init_producer_id_response.go @@ -69,6 +69,10 @@ func (i *InitProducerIDResponse) headerVersion() int16 { return 0 } +func (i *InitProducerIDResponse) isValidVersion() bool { + return i.Version >= 0 && i.Version <= 3 +} + func (i *InitProducerIDResponse) requiredVersion() KafkaVersion { switch i.Version { case 2: diff --git a/join_group_request.go b/join_group_request.go index 432338cd5..de5a2df77 100644 --- a/join_group_request.go +++ b/join_group_request.go @@ -150,6 +150,10 @@ func (r *JoinGroupRequest) headerVersion() int16 { return 1 } +func (r *JoinGroupRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 5 +} + func (r *JoinGroupRequest) requiredVersion() KafkaVersion { switch r.Version { case 4, 5: diff --git a/join_group_response.go b/join_group_response.go index e1fd94c04..0409f88eb 100644 --- a/join_group_response.go +++ b/join_group_response.go @@ -147,6 +147,10 @@ func (r *JoinGroupResponse) headerVersion() int16 { return 0 } +func (r *JoinGroupResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 5 +} + func (r *JoinGroupResponse) requiredVersion() KafkaVersion { switch r.Version { case 3, 4, 5: diff --git a/leave_group_request.go b/leave_group_request.go index 741b7290a..7dc5005a1 100644 --- a/leave_group_request.go +++ b/leave_group_request.go @@ -81,10 +81,17 @@ func (r *LeaveGroupRequest) headerVersion() int16 { return 1 } +func (r *LeaveGroupRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 3 +} + func (r *LeaveGroupRequest) requiredVersion() KafkaVersion { - switch r.Version { - case 1, 2, 3: - return V2_3_0_0 + switch { + case r.Version >= 2: + return V2_0_0_0 + case r.Version >= 1: + return V1_0_0_0 + default: + return V0_9_0_0 } - return V0_9_0_0 } diff --git a/leave_group_response.go b/leave_group_response.go index ccc7e5687..586debeae 100644 --- a/leave_group_response.go +++ b/leave_group_response.go @@ -85,6 +85,10 @@ func (r *LeaveGroupResponse) headerVersion() int16 { return 0 } +func (r *LeaveGroupResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 3 +} + func (r *LeaveGroupResponse) requiredVersion() KafkaVersion { switch r.Version { case 1, 2, 3: diff --git a/list_groups_request.go b/list_groups_request.go index 4553b2d2e..68b3c8f34 100644 --- a/list_groups_request.go +++ b/list_groups_request.go @@ -1,6 +1,8 @@ package sarama -type ListGroupsRequest struct{} +type ListGroupsRequest struct { + Version int16 +} func (r *ListGroupsRequest) encode(pe packetEncoder) error { return nil @@ -15,13 +17,24 @@ func (r *ListGroupsRequest) key() int16 { } func (r *ListGroupsRequest) version() int16 { - return 0 + return r.Version } func (r *ListGroupsRequest) headerVersion() int16 { return 1 } +func (r *ListGroupsRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 2 +} + func (r *ListGroupsRequest) requiredVersion() KafkaVersion { - return V0_9_0_0 + switch r.Version { + case 2: + return V2_0_0_0 + case 1: + return V0_11_0_0 + default: + return V0_9_0_0 + } } diff --git a/list_groups_response.go b/list_groups_response.go index 777bae7e6..a4fd15a34 100644 --- a/list_groups_response.go +++ b/list_groups_response.go @@ -1,8 +1,9 @@ package sarama type ListGroupsResponse struct { - Err KError - Groups map[string]string + Version int16 + Err KError + Groups map[string]string } func (r *ListGroupsResponse) encode(pe packetEncoder) error { @@ -61,13 +62,24 @@ func (r *ListGroupsResponse) key() int16 { } func (r *ListGroupsResponse) version() int16 { - return 0 + return r.Version } func (r *ListGroupsResponse) headerVersion() int16 { return 0 } +func (r *ListGroupsResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 2 +} + func (r *ListGroupsResponse) requiredVersion() KafkaVersion { - return V0_9_0_0 + switch r.Version { + case 2: + return V2_0_0_0 + case 1: + return V0_11_0_0 + default: + return V0_9_0_0 + } } diff --git a/list_partition_reassignments_request.go b/list_partition_reassignments_request.go index c1ffa9ba0..c7ad5e981 100644 --- a/list_partition_reassignments_request.go +++ b/list_partition_reassignments_request.go @@ -83,6 +83,10 @@ func (r *ListPartitionReassignmentsRequest) headerVersion() int16 { return 2 } +func (r *ListPartitionReassignmentsRequest) isValidVersion() bool { + return r.Version == 0 +} + func (r *ListPartitionReassignmentsRequest) requiredVersion() KafkaVersion { return V2_4_0_0 } diff --git a/list_partition_reassignments_response.go b/list_partition_reassignments_response.go index 568791fea..426f1c771 100644 --- a/list_partition_reassignments_response.go +++ b/list_partition_reassignments_response.go @@ -166,6 +166,10 @@ func (r *ListPartitionReassignmentsResponse) headerVersion() int16 { return 1 } +func (r *ListPartitionReassignmentsResponse) isValidVersion() bool { + return r.Version == 0 +} + func (r *ListPartitionReassignmentsResponse) requiredVersion() KafkaVersion { return V2_4_0_0 } diff --git a/metadata_request.go b/metadata_request.go index e0462b720..9e46eef94 100644 --- a/metadata_request.go +++ b/metadata_request.go @@ -88,6 +88,10 @@ func (r *MetadataRequest) headerVersion() int16 { return 1 } +func (r *MetadataRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 7 +} + func (r *MetadataRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/metadata_response.go b/metadata_response.go index a9b979150..48c8b953f 100644 --- a/metadata_response.go +++ b/metadata_response.go @@ -275,6 +275,10 @@ func (r *MetadataResponse) headerVersion() int16 { return 0 } +func (r *MetadataResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 7 +} + func (r *MetadataResponse) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/offset_commit_request.go b/offset_commit_request.go index ed0566fe6..edb585039 100644 --- a/offset_commit_request.go +++ b/offset_commit_request.go @@ -201,6 +201,10 @@ func (r *OffsetCommitRequest) headerVersion() int16 { return 1 } +func (r *OffsetCommitRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 7 +} + func (r *OffsetCommitRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/offset_commit_response.go b/offset_commit_response.go index 80828c853..efa5c485d 100644 --- a/offset_commit_response.go +++ b/offset_commit_response.go @@ -100,6 +100,10 @@ func (r *OffsetCommitResponse) headerVersion() int16 { return 0 } +func (r *OffsetCommitResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 7 +} + func (r *OffsetCommitResponse) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/offset_fetch_request.go b/offset_fetch_request.go index 7e147eb60..714529bc4 100644 --- a/offset_fetch_request.go +++ b/offset_fetch_request.go @@ -171,6 +171,10 @@ func (r *OffsetFetchRequest) headerVersion() int16 { return 1 } +func (r *OffsetFetchRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 7 +} + func (r *OffsetFetchRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/offset_fetch_response.go b/offset_fetch_response.go index e2e34d7ff..7dc261f74 100644 --- a/offset_fetch_response.go +++ b/offset_fetch_response.go @@ -236,6 +236,10 @@ func (r *OffsetFetchResponse) headerVersion() int16 { return 0 } +func (r *OffsetFetchResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 7 +} + func (r *OffsetFetchResponse) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/offset_request.go b/offset_request.go index 4c9ce4df5..8cf0e1d49 100644 --- a/offset_request.go +++ b/offset_request.go @@ -137,12 +137,22 @@ func (r *OffsetRequest) headerVersion() int16 { return 1 } +func (r *OffsetRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 5 +} + func (r *OffsetRequest) requiredVersion() KafkaVersion { switch r.Version { - case 1: - return V0_10_1_0 + case 5: + return V2_2_0_0 + case 4: + return V2_1_0_0 + case 3: + return V2_0_0_0 case 2: return V0_11_0_0 + case 1: + return V0_10_1_0 default: return MinVersion } diff --git a/offset_response.go b/offset_response.go index 59d158d9d..4aa91e4fd 100644 --- a/offset_response.go +++ b/offset_response.go @@ -167,12 +167,22 @@ func (r *OffsetResponse) headerVersion() int16 { return 0 } +func (r *OffsetResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 5 +} + func (r *OffsetResponse) requiredVersion() KafkaVersion { switch r.Version { - case 1: - return V0_10_1_0 + case 5: + return V2_2_0_0 + case 4: + return V2_1_0_0 + case 3: + return V2_0_0_0 case 2: return V0_11_0_0 + case 1: + return V0_10_1_0 default: return MinVersion } diff --git a/produce_request.go b/produce_request.go index 0034651e2..aed227533 100644 --- a/produce_request.go +++ b/produce_request.go @@ -29,7 +29,8 @@ type ProduceRequest struct { } func updateMsgSetMetrics(msgSet *MessageSet, compressionRatioMetric metrics.Histogram, - topicCompressionRatioMetric metrics.Histogram) int64 { + topicCompressionRatioMetric metrics.Histogram, +) int64 { var topicRecordCount int64 for _, messageBlock := range msgSet.Messages { // Is this a fake "message" wrapping real messages? @@ -53,7 +54,8 @@ func updateMsgSetMetrics(msgSet *MessageSet, compressionRatioMetric metrics.Hist } func updateBatchMetrics(recordBatch *RecordBatch, compressionRatioMetric metrics.Histogram, - topicCompressionRatioMetric metrics.Histogram) int64 { + topicCompressionRatioMetric metrics.Histogram, +) int64 { if recordBatch.compressedRecords != nil { compressionRatio := int64(float64(recordBatch.recordsLen) / float64(len(recordBatch.compressedRecords)) * 100) compressionRatioMetric.Update(compressionRatio) @@ -210,6 +212,10 @@ func (r *ProduceRequest) headerVersion() int16 { return 1 } +func (r *ProduceRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 7 +} + func (r *ProduceRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/produce_response.go b/produce_response.go index d7cda5d6b..37ad8db2d 100644 --- a/produce_response.go +++ b/produce_response.go @@ -175,6 +175,10 @@ func (r *ProduceResponse) headerVersion() int16 { return 0 } +func (r *ProduceResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 7 +} + func (r *ProduceResponse) requiredVersion() KafkaVersion { return MinVersion } diff --git a/request.go b/request.go index 1e3923de7..6011fa007 100644 --- a/request.go +++ b/request.go @@ -12,6 +12,7 @@ type protocolBody interface { key() int16 version() int16 headerVersion() int16 + isValidVersion() bool requiredVersion() KafkaVersion } @@ -119,7 +120,7 @@ func decodeRequest(r io.Reader) (*request, int, error) { func allocateBody(key, version int16) protocolBody { switch key { case 0: - return &ProduceRequest{} + return &ProduceRequest{Version: version} case 1: return &FetchRequest{Version: version} case 2: @@ -131,73 +132,73 @@ func allocateBody(key, version int16) protocolBody { case 9: return &OffsetFetchRequest{Version: version} case 10: - return &FindCoordinatorRequest{} + return &FindCoordinatorRequest{Version: version} case 11: - return &JoinGroupRequest{} + return &JoinGroupRequest{Version: version} case 12: - return &HeartbeatRequest{} + return &HeartbeatRequest{Version: version} case 13: - return &LeaveGroupRequest{} + return &LeaveGroupRequest{Version: version} case 14: - return &SyncGroupRequest{} + return &SyncGroupRequest{Version: version} case 15: - return &DescribeGroupsRequest{} + return &DescribeGroupsRequest{Version: version} case 16: - return &ListGroupsRequest{} + return &ListGroupsRequest{Version: version} case 17: - return &SaslHandshakeRequest{} + return &SaslHandshakeRequest{Version: version} case 18: return &ApiVersionsRequest{Version: version} case 19: - return &CreateTopicsRequest{} + return &CreateTopicsRequest{Version: version} case 20: - return &DeleteTopicsRequest{} + return &DeleteTopicsRequest{Version: version} case 21: - return &DeleteRecordsRequest{} + return &DeleteRecordsRequest{Version: version} case 22: return &InitProducerIDRequest{Version: version} case 24: - return &AddPartitionsToTxnRequest{} + return &AddPartitionsToTxnRequest{Version: version} case 25: - return &AddOffsetsToTxnRequest{} + return &AddOffsetsToTxnRequest{Version: version} case 26: - return &EndTxnRequest{} + return &EndTxnRequest{Version: version} case 28: - return &TxnOffsetCommitRequest{} + return &TxnOffsetCommitRequest{Version: version} case 29: - return &DescribeAclsRequest{} + return &DescribeAclsRequest{Version: int(version)} case 30: - return &CreateAclsRequest{} + return &CreateAclsRequest{Version: version} case 31: - return &DeleteAclsRequest{} + return &DeleteAclsRequest{Version: int(version)} case 32: - return &DescribeConfigsRequest{} + return &DescribeConfigsRequest{Version: version} case 33: - return &AlterConfigsRequest{} + return &AlterConfigsRequest{Version: version} case 35: - return &DescribeLogDirsRequest{} + return &DescribeLogDirsRequest{Version: version} case 36: - return &SaslAuthenticateRequest{} + return &SaslAuthenticateRequest{Version: version} case 37: - return &CreatePartitionsRequest{} + return &CreatePartitionsRequest{Version: version} case 42: - return &DeleteGroupsRequest{} + return &DeleteGroupsRequest{Version: version} case 44: - return &IncrementalAlterConfigsRequest{} + return &IncrementalAlterConfigsRequest{Version: version} case 45: - return &AlterPartitionReassignmentsRequest{} + return &AlterPartitionReassignmentsRequest{Version: version} case 46: - return &ListPartitionReassignmentsRequest{} + return &ListPartitionReassignmentsRequest{Version: version} case 47: - return &DeleteOffsetsRequest{} + return &DeleteOffsetsRequest{Version: version} case 48: - return &DescribeClientQuotasRequest{} + return &DescribeClientQuotasRequest{Version: version} case 49: - return &AlterClientQuotasRequest{} + return &AlterClientQuotasRequest{Version: version} case 50: - return &DescribeUserScramCredentialsRequest{} + return &DescribeUserScramCredentialsRequest{Version: version} case 51: - return &AlterUserScramCredentialsRequest{} + return &AlterUserScramCredentialsRequest{Version: version} } return nil } diff --git a/sasl_authenticate_request.go b/sasl_authenticate_request.go index 5bb0988ea..3a562a53b 100644 --- a/sasl_authenticate_request.go +++ b/sasl_authenticate_request.go @@ -31,6 +31,10 @@ func (r *SaslAuthenticateRequest) headerVersion() int16 { return 1 } +func (r *SaslAuthenticateRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 1 +} + func (r *SaslAuthenticateRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/sasl_authenticate_response.go b/sasl_authenticate_response.go index 37c8e45da..ae52cde1c 100644 --- a/sasl_authenticate_response.go +++ b/sasl_authenticate_response.go @@ -59,6 +59,10 @@ func (r *SaslAuthenticateResponse) headerVersion() int16 { return 0 } +func (r *SaslAuthenticateResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 1 +} + func (r *SaslAuthenticateResponse) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/sasl_handshake_request.go b/sasl_handshake_request.go index 74dc3072f..410a5b0ea 100644 --- a/sasl_handshake_request.go +++ b/sasl_handshake_request.go @@ -33,6 +33,15 @@ func (r *SaslHandshakeRequest) headerVersion() int16 { return 1 } +func (r *SaslHandshakeRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 1 +} + func (r *SaslHandshakeRequest) requiredVersion() KafkaVersion { - return V0_10_0_0 + switch r.Version { + case 1: + return V1_0_0_0 + default: + return V0_10_0_0 + } } diff --git a/sasl_handshake_response.go b/sasl_handshake_response.go index 69dfc3178..502732cbd 100644 --- a/sasl_handshake_response.go +++ b/sasl_handshake_response.go @@ -1,6 +1,7 @@ package sarama type SaslHandshakeResponse struct { + Version int16 Err KError EnabledMechanisms []string } @@ -30,13 +31,22 @@ func (r *SaslHandshakeResponse) key() int16 { } func (r *SaslHandshakeResponse) version() int16 { - return 0 + return r.Version } func (r *SaslHandshakeResponse) headerVersion() int16 { return 0 } +func (r *SaslHandshakeResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 1 +} + func (r *SaslHandshakeResponse) requiredVersion() KafkaVersion { - return V0_10_0_0 + switch r.Version { + case 1: + return V1_0_0_0 + default: + return V0_10_0_0 + } } diff --git a/sync_group_request.go b/sync_group_request.go index 33ed3bacc..6b6b2d0ca 100644 --- a/sync_group_request.go +++ b/sync_group_request.go @@ -123,6 +123,10 @@ func (r *SyncGroupRequest) headerVersion() int16 { return 1 } +func (r *SyncGroupRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 3 +} + func (r *SyncGroupRequest) requiredVersion() KafkaVersion { switch { case r.Version >= 3: diff --git a/sync_group_response.go b/sync_group_response.go index 5c9cc1a6a..3661ebcdd 100644 --- a/sync_group_response.go +++ b/sync_group_response.go @@ -59,6 +59,10 @@ func (r *SyncGroupResponse) headerVersion() int16 { return 0 } +func (r *SyncGroupResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 3 +} + func (r *SyncGroupResponse) requiredVersion() KafkaVersion { switch r.Version { case 1, 2, 3: diff --git a/txn_offset_commit_request.go b/txn_offset_commit_request.go index c4043a335..9a1cffca7 100644 --- a/txn_offset_commit_request.go +++ b/txn_offset_commit_request.go @@ -1,6 +1,7 @@ package sarama type TxnOffsetCommitRequest struct { + Version int16 TransactionalID string GroupID string ProducerID int64 @@ -88,13 +89,17 @@ func (a *TxnOffsetCommitRequest) key() int16 { } func (a *TxnOffsetCommitRequest) version() int16 { - return 0 + return a.Version } func (a *TxnOffsetCommitRequest) headerVersion() int16 { return 1 } +func (a *TxnOffsetCommitRequest) isValidVersion() bool { + return a.Version == 0 +} + func (a *TxnOffsetCommitRequest) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/txn_offset_commit_response.go b/txn_offset_commit_response.go index 2767f8a68..8248b801f 100644 --- a/txn_offset_commit_response.go +++ b/txn_offset_commit_response.go @@ -5,6 +5,7 @@ import ( ) type TxnOffsetCommitResponse struct { + Version int16 ThrottleTime time.Duration Topics map[string][]*PartitionError } @@ -75,13 +76,17 @@ func (a *TxnOffsetCommitResponse) key() int16 { } func (a *TxnOffsetCommitResponse) version() int16 { - return 0 + return a.Version } func (a *TxnOffsetCommitResponse) headerVersion() int16 { return 0 } +func (a *TxnOffsetCommitResponse) isValidVersion() bool { + return a.Version == 0 +} + func (a *TxnOffsetCommitResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } From fa37d61a651fca7597c9e6993ab7b69e27190b3f Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 2 Aug 2023 00:07:54 +0100 Subject: [PATCH 2/8] fix(proto): use DescribeLogDirsRequest v1 This is identical to v0 but can used from broker 2.0.0.0 onwards Signed-off-by: Dominic Evans --- admin.go | 6 +++++- describe_log_dirs_request.go | 5 ++++- describe_log_dirs_response.go | 5 ++++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/admin.go b/admin.go index 29eeca1c6..31b903fe6 100644 --- a/admin.go +++ b/admin.go @@ -1061,7 +1061,11 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32 defer wg.Done() _ = b.Open(conf) // Ensure that broker is opened - response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{}) + request := &DescribeLogDirsRequest{} + if ca.conf.Version.IsAtLeast(V2_0_0_0) { + request.Version = 1 + } + response, err := b.DescribeLogDirs(request) if err != nil { errChan <- err return diff --git a/describe_log_dirs_request.go b/describe_log_dirs_request.go index 9dcdf56d2..a6613c320 100644 --- a/describe_log_dirs_request.go +++ b/describe_log_dirs_request.go @@ -83,9 +83,12 @@ func (r *DescribeLogDirsRequest) headerVersion() int16 { } func (r *DescribeLogDirsRequest) isValidVersion() bool { - return r.Version == 0 + return r.Version >= 0 && r.Version <= 1 } func (r *DescribeLogDirsRequest) requiredVersion() KafkaVersion { + if r.Version > 0 { + return V2_0_0_0 + } return V1_0_0_0 } diff --git a/describe_log_dirs_response.go b/describe_log_dirs_response.go index 555b2b1b2..41b4968da 100644 --- a/describe_log_dirs_response.go +++ b/describe_log_dirs_response.go @@ -66,10 +66,13 @@ func (r *DescribeLogDirsResponse) headerVersion() int16 { } func (r *DescribeLogDirsResponse) isValidVersion() bool { - return r.Version == 0 + return r.Version >= 0 && r.Version <= 1 } func (r *DescribeLogDirsResponse) requiredVersion() KafkaVersion { + if r.Version > 0 { + return V2_0_0_0 + } return V1_0_0_0 } From 40fa609edcc66f2fc343c7b5cdcabb7739086aac Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 2 Aug 2023 17:47:16 +0100 Subject: [PATCH 3/8] fix(proto): ensure req+resp requiredVersion match Testing uncovered a few mismatches between the requiredVersion implementation in the request and response for these types Also fix wrong key() in alter_configs_response! Signed-off-by: Dominic Evans --- alter_configs_response.go | 2 +- heartbeat_request.go | 11 ++++++++--- heartbeat_response.go | 9 +++++++-- leave_group_request.go | 10 ++++++---- leave_group_response.go | 11 ++++++++--- sync_group_request.go | 11 ++++++++--- sync_group_response.go | 9 +++++++-- 7 files changed, 45 insertions(+), 18 deletions(-) diff --git a/alter_configs_response.go b/alter_configs_response.go index 72ffcc9f7..99888840e 100644 --- a/alter_configs_response.go +++ b/alter_configs_response.go @@ -101,7 +101,7 @@ func (a *AlterConfigsResourceResponse) decode(pd packetDecoder, version int16) e } func (a *AlterConfigsResponse) key() int16 { - return 32 + return 33 } func (a *AlterConfigsResponse) version() int16 { diff --git a/heartbeat_request.go b/heartbeat_request.go index 77a19f816..ea1b19279 100644 --- a/heartbeat_request.go +++ b/heartbeat_request.go @@ -65,9 +65,14 @@ func (r *HeartbeatRequest) isValidVersion() bool { } func (r *HeartbeatRequest) requiredVersion() KafkaVersion { - switch { - case r.Version >= 3: + switch r.Version { + case 3: return V2_3_0_0 + case 2: + return V2_0_0_0 + case 1: + return V0_11_0_0 + default: + return V0_9_0_0 } - return V0_9_0_0 } diff --git a/heartbeat_response.go b/heartbeat_response.go index d6dd07102..c4952d94a 100644 --- a/heartbeat_response.go +++ b/heartbeat_response.go @@ -51,10 +51,15 @@ func (r *HeartbeatResponse) isValidVersion() bool { func (r *HeartbeatResponse) requiredVersion() KafkaVersion { switch r.Version { - case 1, 2, 3: + case 3: return V2_3_0_0 + case 2: + return V2_0_0_0 + case 1: + return V0_11_0_0 + default: + return V0_9_0_0 } - return V0_9_0_0 } func (r *HeartbeatResponse) throttleTime() time.Duration { diff --git a/leave_group_request.go b/leave_group_request.go index 7dc5005a1..306cb3c2b 100644 --- a/leave_group_request.go +++ b/leave_group_request.go @@ -86,11 +86,13 @@ func (r *LeaveGroupRequest) isValidVersion() bool { } func (r *LeaveGroupRequest) requiredVersion() KafkaVersion { - switch { - case r.Version >= 2: + switch r.Version { + case 3: + return V2_4_0_0 + case 2: return V2_0_0_0 - case r.Version >= 1: - return V1_0_0_0 + case 1: + return V0_11_0_0 default: return V0_9_0_0 } diff --git a/leave_group_response.go b/leave_group_response.go index 586debeae..faa263895 100644 --- a/leave_group_response.go +++ b/leave_group_response.go @@ -91,10 +91,15 @@ func (r *LeaveGroupResponse) isValidVersion() bool { func (r *LeaveGroupResponse) requiredVersion() KafkaVersion { switch r.Version { - case 1, 2, 3: - return V2_3_0_0 + case 3: + return V2_4_0_0 + case 2: + return V2_0_0_0 + case 1: + return V0_11_0_0 + default: + return V0_9_0_0 } - return V0_9_0_0 } func (r *LeaveGroupResponse) throttleTime() time.Duration { diff --git a/sync_group_request.go b/sync_group_request.go index 6b6b2d0ca..581b1ea29 100644 --- a/sync_group_request.go +++ b/sync_group_request.go @@ -128,11 +128,16 @@ func (r *SyncGroupRequest) isValidVersion() bool { } func (r *SyncGroupRequest) requiredVersion() KafkaVersion { - switch { - case r.Version >= 3: + switch r.Version { + case 3: return V2_3_0_0 + case 2: + return V2_0_0_0 + case 1: + return V0_11_0_0 + default: + return V0_9_0_0 } - return V0_9_0_0 } func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) { diff --git a/sync_group_response.go b/sync_group_response.go index 3661ebcdd..ca47f790d 100644 --- a/sync_group_response.go +++ b/sync_group_response.go @@ -65,10 +65,15 @@ func (r *SyncGroupResponse) isValidVersion() bool { func (r *SyncGroupResponse) requiredVersion() KafkaVersion { switch r.Version { - case 1, 2, 3: + case 3: return V2_3_0_0 + case 2: + return V2_0_0_0 + case 1: + return V0_11_0_0 + default: + return V0_9_0_0 } - return V0_9_0_0 } func (r *SyncGroupResponse) throttleTime() time.Duration { From c240c675eb8125bf012bb6391efaaad3a56bbcaf Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 2 Aug 2023 17:51:15 +0100 Subject: [PATCH 4/8] fix(proto): extend txn types for identical versions Co-authored-by: Mark Hindess Signed-off-by: Dominic Evans --- add_offsets_to_txn_request.go | 11 +++++++++-- add_offsets_to_txn_response.go | 11 +++++++++-- add_partitions_to_txn_request.go | 11 +++++++++-- add_partitions_to_txn_response.go | 11 +++++++++-- end_txn_request.go | 11 +++++++++-- end_txn_response.go | 11 +++++++++-- init_producer_id_request.go | 14 ++++++-------- init_producer_id_response.go | 12 ++++++------ txn_offset_commit_request.go | 9 +++++++-- txn_offset_commit_response.go | 9 +++++++-- 10 files changed, 80 insertions(+), 30 deletions(-) diff --git a/add_offsets_to_txn_request.go b/add_offsets_to_txn_request.go index fe6f773bb..508a5fc6e 100644 --- a/add_offsets_to_txn_request.go +++ b/add_offsets_to_txn_request.go @@ -54,9 +54,16 @@ func (a *AddOffsetsToTxnRequest) headerVersion() int16 { } func (a *AddOffsetsToTxnRequest) isValidVersion() bool { - return a.Version == 0 + return a.Version >= 0 && a.Version <= 2 } func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch a.Version { + case 2: + return V2_7_0_0 + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } diff --git a/add_offsets_to_txn_response.go b/add_offsets_to_txn_response.go index 8df6f1ca6..d6e52cf20 100644 --- a/add_offsets_to_txn_response.go +++ b/add_offsets_to_txn_response.go @@ -46,11 +46,18 @@ func (a *AddOffsetsToTxnResponse) headerVersion() int16 { } func (a *AddOffsetsToTxnResponse) isValidVersion() bool { - return a.Version == 0 + return a.Version >= 0 && a.Version <= 2 } func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch a.Version { + case 2: + return V2_7_0_0 + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } func (r *AddOffsetsToTxnResponse) throttleTime() time.Duration { diff --git a/add_partitions_to_txn_request.go b/add_partitions_to_txn_request.go index 61d7b4fde..3e2c63c64 100644 --- a/add_partitions_to_txn_request.go +++ b/add_partitions_to_txn_request.go @@ -78,9 +78,16 @@ func (a *AddPartitionsToTxnRequest) headerVersion() int16 { } func (a *AddPartitionsToTxnRequest) isValidVersion() bool { - return a.Version == 0 + return a.Version >= 0 && a.Version <= 2 } func (a *AddPartitionsToTxnRequest) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch a.Version { + case 2: + return V2_7_0_0 + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } diff --git a/add_partitions_to_txn_response.go b/add_partitions_to_txn_response.go index 2c38c57d1..5cbc1b044 100644 --- a/add_partitions_to_txn_response.go +++ b/add_partitions_to_txn_response.go @@ -85,11 +85,18 @@ func (a *AddPartitionsToTxnResponse) headerVersion() int16 { } func (a *AddPartitionsToTxnResponse) isValidVersion() bool { - return a.Version == 0 + return a.Version >= 0 && a.Version <= 2 } func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch a.Version { + case 2: + return V2_7_0_0 + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } func (r *AddPartitionsToTxnResponse) throttleTime() time.Duration { diff --git a/end_txn_request.go b/end_txn_request.go index e0e3872c6..638099a5d 100644 --- a/end_txn_request.go +++ b/end_txn_request.go @@ -51,9 +51,16 @@ func (r *EndTxnRequest) headerVersion() int16 { } func (a *EndTxnRequest) isValidVersion() bool { - return a.Version == 0 + return a.Version >= 0 && a.Version <= 2 } func (a *EndTxnRequest) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch a.Version { + case 2: + return V2_7_0_0 + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } diff --git a/end_txn_response.go b/end_txn_response.go index 7fcfb0406..54597df8c 100644 --- a/end_txn_response.go +++ b/end_txn_response.go @@ -45,11 +45,18 @@ func (r *EndTxnResponse) headerVersion() int16 { } func (e *EndTxnResponse) isValidVersion() bool { - return e.Version == 0 + return e.Version >= 0 && e.Version <= 2 } func (e *EndTxnResponse) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch e.Version { + case 2: + return V2_7_0_0 + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } func (r *EndTxnResponse) throttleTime() time.Duration { diff --git a/init_producer_id_request.go b/init_producer_id_request.go index ff3f4de78..7e8f82a82 100644 --- a/init_producer_id_request.go +++ b/init_producer_id_request.go @@ -85,21 +85,19 @@ func (i *InitProducerIDRequest) headerVersion() int16 { } func (i *InitProducerIDRequest) isValidVersion() bool { - return i.Version >= 0 && i.Version <= 3 + return i.Version >= 0 && i.Version <= 4 } func (i *InitProducerIDRequest) requiredVersion() KafkaVersion { switch i.Version { - case 2: - // Added tagged fields - return V2_4_0_0 + case 4: + return V2_7_0_0 case 3: - // Added ProducerID/Epoch return V2_5_0_0 - case 0: - fallthrough + case 2: + return V2_4_0_0 case 1: - fallthrough + return V2_0_0_0 default: return V0_11_0_0 } diff --git a/init_producer_id_response.go b/init_producer_id_response.go index 22e56e80c..256077189 100644 --- a/init_producer_id_response.go +++ b/init_producer_id_response.go @@ -70,19 +70,19 @@ func (i *InitProducerIDResponse) headerVersion() int16 { } func (i *InitProducerIDResponse) isValidVersion() bool { - return i.Version >= 0 && i.Version <= 3 + return i.Version >= 0 && i.Version <= 4 } func (i *InitProducerIDResponse) requiredVersion() KafkaVersion { switch i.Version { - case 2: - fallthrough + case 4: + return V2_7_0_0 case 3: + return V2_5_0_0 + case 2: return V2_4_0_0 - case 0: - fallthrough case 1: - fallthrough + return V2_0_0_0 default: return V0_11_0_0 } diff --git a/txn_offset_commit_request.go b/txn_offset_commit_request.go index 9a1cffca7..6b9b4e6ab 100644 --- a/txn_offset_commit_request.go +++ b/txn_offset_commit_request.go @@ -97,11 +97,16 @@ func (a *TxnOffsetCommitRequest) headerVersion() int16 { } func (a *TxnOffsetCommitRequest) isValidVersion() bool { - return a.Version == 0 + return a.Version >= 0 && a.Version <= 1 } func (a *TxnOffsetCommitRequest) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch a.Version { + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } type PartitionOffsetMetadata struct { diff --git a/txn_offset_commit_response.go b/txn_offset_commit_response.go index 8248b801f..6f3ba1ce4 100644 --- a/txn_offset_commit_response.go +++ b/txn_offset_commit_response.go @@ -84,11 +84,16 @@ func (a *TxnOffsetCommitResponse) headerVersion() int16 { } func (a *TxnOffsetCommitResponse) isValidVersion() bool { - return a.Version == 0 + return a.Version >= 0 && a.Version <= 1 } func (a *TxnOffsetCommitResponse) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch a.Version { + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } func (r *TxnOffsetCommitResponse) throttleTime() time.Duration { From 3b8260652435c631ccac18e28025fb3f83a077a7 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 2 Aug 2023 18:08:08 +0100 Subject: [PATCH 5/8] fix(proto): correct consumer metadata shim This is really just proxying to FindCoordinatorRequest/FindCoordinatorResponse, but for now just copy in the same isValidVersion/requiredVersion code and ensure we're passing Version to and from it correctly. Signed-off-by: Dominic Evans --- consumer_metadata_request.go | 12 ++++++++++-- consumer_metadata_response.go | 13 ++++++++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/consumer_metadata_request.go b/consumer_metadata_request.go index 3bef616d2..ef6b9e721 100644 --- a/consumer_metadata_request.go +++ b/consumer_metadata_request.go @@ -10,6 +10,7 @@ func (r *ConsumerMetadataRequest) encode(pe packetEncoder) error { tmp := new(FindCoordinatorRequest) tmp.CoordinatorKey = r.ConsumerGroup tmp.CoordinatorType = CoordinatorGroup + tmp.Version = r.Version return tmp.encode(pe) } @@ -35,9 +36,16 @@ func (r *ConsumerMetadataRequest) headerVersion() int16 { } func (r *ConsumerMetadataRequest) isValidVersion() bool { - return r.Version == 0 + return r.Version >= 0 && r.Version <= 2 } func (r *ConsumerMetadataRequest) requiredVersion() KafkaVersion { - return V0_8_2_0 + switch r.Version { + case 2: + return V2_0_0_0 + case 1: + return V0_11_0_0 + default: + return V0_8_2_0 + } } diff --git a/consumer_metadata_response.go b/consumer_metadata_response.go index a8e95b2ce..d99209e3b 100644 --- a/consumer_metadata_response.go +++ b/consumer_metadata_response.go @@ -54,7 +54,7 @@ func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error { } tmp := &FindCoordinatorResponse{ - Version: 0, + Version: r.Version, Err: r.Err, Coordinator: r.Coordinator, } @@ -79,9 +79,16 @@ func (r *ConsumerMetadataResponse) headerVersion() int16 { } func (r *ConsumerMetadataResponse) isValidVersion() bool { - return r.Version == 0 + return r.Version >= 0 && r.Version <= 2 } func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion { - return V0_8_2_0 + switch r.Version { + case 2: + return V2_0_0_0 + case 1: + return V0_11_0_0 + default: + return V0_8_2_0 + } } From ee2872c86000d8d3683257d1ebecd9ec66126ca3 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 2 Aug 2023 22:01:16 +0100 Subject: [PATCH 6/8] fix(admin): remove group member needs >= 2.4.0 Signed-off-by: Dominic Evans --- admin.go | 4 ++++ functional_consumer_staticmembership_test.go | 6 +++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/admin.go b/admin.go index 31b903fe6..ef5f481a4 100644 --- a/admin.go +++ b/admin.go @@ -1212,6 +1212,10 @@ func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op Clie } func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) { + if !ca.conf.Version.IsAtLeast(V2_4_0_0) { + return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0") + } + controller, err := ca.client.Coordinator(groupId) if err != nil { return nil, err diff --git a/functional_consumer_staticmembership_test.go b/functional_consumer_staticmembership_test.go index 946169b10..6b8f3245d 100644 --- a/functional_consumer_staticmembership_test.go +++ b/functional_consumer_staticmembership_test.go @@ -67,7 +67,7 @@ func TestFuncConsumerGroupStaticMembership_Basic(t *testing.T) { } func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) { - checkKafkaVersion(t, "2.3.0") + checkKafkaVersion(t, "2.4.0") setupFunctionalTest(t) defer teardownFunctionalTest(t) groupID := testFuncConsumerGroupID(t) @@ -76,7 +76,7 @@ func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) { config1 := NewTestConfig() config1.ClientID = "M1" - config1.Version = V2_3_0_0 + config1.Version = V2_4_0_0 config1.Consumer.Offsets.Initial = OffsetNewest config1.Consumer.Group.InstanceId = "Instance1" m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, math.MaxInt32, nil, "test.4") @@ -84,7 +84,7 @@ func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) { config2 := NewTestConfig() config2.ClientID = "M2" - config2.Version = V2_3_0_0 + config2.Version = V2_4_0_0 config2.Consumer.Offsets.Initial = OffsetNewest config2.Consumer.Group.InstanceId = "Instance2" m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, groupID, math.MaxInt32, nil, "test.4") From b8cc2b108cf58a24f1abb5602e3b471790ac48fd Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Tue, 1 Aug 2023 17:28:00 +0100 Subject: [PATCH 7/8] feat(proto): add test around supported versions Initially seeded with only the protocol versions required for Kafka v1.1, check we've implemented the expected versions and they pass the isValidVersion and the requiredVersion checks as expected. Signed-off-by: Dominic Evans --- request_test.go | 244 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 244 insertions(+) diff --git a/request_test.go b/request_test.go index 193806994..c1abd5bed 100644 --- a/request_test.go +++ b/request_test.go @@ -2,12 +2,256 @@ package sarama import ( "bytes" + "fmt" "reflect" "testing" "github.com/davecgh/go-spew/spew" + assert "github.com/stretchr/testify/require" ) +var names = map[int16]string{ + 0: "ProduceRequest", + 1: "FetchRequest", + 2: "ListOffsetsRequest", + 3: "MetadataRequest", + 4: "LeaderAndIsrRequest", + 5: "StopReplicaRequest", + 6: "UpdateMetadataRequest", + 7: "ControlledShutdownRequest", + 8: "OffsetCommitRequest", + 9: "OffsetFetchRequest", + 10: "FindCoordinatorRequest", + 11: "JoinGroupRequest", + 12: "HeartbeatRequest", + 13: "LeaveGroupRequest", + 14: "SyncGroupRequest", + 15: "DescribeGroupsRequest", + 16: "ListGroupsRequest", + 17: "SaslHandshakeRequest", + 18: "ApiVersionsRequest", + 19: "CreateTopicsRequest", + 20: "DeleteTopicsRequest", + 21: "DeleteRecordsRequest", + 22: "InitProducerIdRequest", + 23: "OffsetForLeaderEpochRequest", + 24: "AddPartitionsToTxnRequest", + 25: "AddOffsetsToTxnRequest", + 26: "EndTxnRequest", + 27: "WriteTxnMarkersRequest", + 28: "TxnOffsetCommitRequest", + 29: "DescribeAclsRequest", + 30: "CreateAclsRequest", + 31: "DeleteAclsRequest", + 32: "DescribeConfigsRequest", + 33: "AlterConfigsRequest", + 34: "AlterReplicaLogDirsRequest", + 35: "DescribeLogDirsRequest", + 36: "SaslAuthenticateRequest", + 37: "CreatePartitionsRequest", + 38: "CreateDelegationTokenRequest", + 39: "RenewDelegationTokenRequest", + 40: "ExpireDelegationTokenRequest", + 41: "DescribeDelegationTokenRequest", + 42: "DeleteGroupsRequest", + 43: "ElectLeadersRequest", + 44: "IncrementalAlterConfigsRequest", + 45: "AlterPartitionReassignmentsRequest", + 46: "ListPartitionReassignmentsRequest", + 47: "OffsetDeleteRequest", + 48: "DescribeClientQuotasRequest", + 49: "AlterClientQuotasRequest", + 50: "DescribeUserScramCredentialsRequest", + 51: "AlterUserScramCredentialsRequest", + 52: "VoteRequest", + 53: "BeginQuorumEpochRequest", + 54: "EndQuorumEpochRequest", + 55: "DescribeQuorumRequest", + 56: "AlterPartitionRequest", + 57: "UpdateFeaturesRequest", + 58: "EnvelopeRequest", + 59: "FetchSnapshotRequest", + 60: "DescribeClusterRequest", + 61: "DescribeProducersRequest", + 62: "BrokerRegistrationRequest", + 63: "BrokerHeartbeatRequest", + 64: "UnregisterBrokerRequest", + 65: "DescribeTransactionsRequest", + 66: "ListTransactionsRequest", + 67: "AllocateProducerIdsRequest", + 68: "ConsumerGroupHeartbeatRequest", +} + +// allocateResponseBody is a test-only clone of allocateBody. There's no +// central registry of types, so we can't do this using reflection for Response +// types and assuming that the struct is identically named, just with Response +// instead of Request. +func allocateResponseBody(req protocolBody) protocolBody { + key := req.key() + version := req.version() + switch key { + case 0: + return &ProduceResponse{Version: version} + case 1: + return &FetchResponse{Version: version} + case 2: + return &OffsetResponse{Version: version} + case 3: + return &MetadataResponse{Version: version} + case 8: + return &OffsetCommitResponse{Version: version} + case 9: + return &OffsetFetchResponse{Version: version} + case 10: + return &FindCoordinatorResponse{Version: version} + case 11: + return &JoinGroupResponse{Version: version} + case 12: + return &HeartbeatResponse{Version: version} + case 13: + return &LeaveGroupResponse{Version: version} + case 14: + return &SyncGroupResponse{Version: version} + case 15: + return &DescribeGroupsResponse{Version: version} + case 16: + return &ListGroupsResponse{Version: version} + case 17: + return &SaslHandshakeResponse{Version: version} + case 18: + return &ApiVersionsResponse{Version: version} + case 19: + return &CreateTopicsResponse{Version: version} + case 20: + return &DeleteTopicsResponse{Version: version} + case 21: + return &DeleteRecordsResponse{Version: version} + case 22: + return &InitProducerIDResponse{Version: version} + case 24: + return &AddPartitionsToTxnResponse{Version: version} + case 25: + return &AddOffsetsToTxnResponse{Version: version} + case 26: + return &EndTxnResponse{Version: version} + case 28: + return &TxnOffsetCommitResponse{Version: version} + case 29: + return &DescribeAclsResponse{Version: version} + case 30: + return &CreateAclsResponse{Version: version} + case 31: + return &DeleteAclsResponse{Version: version} + case 32: + return &DescribeConfigsResponse{Version: version} + case 33: + return &AlterConfigsResponse{Version: version} + case 35: + return &DescribeLogDirsResponse{Version: version} + case 36: + return &SaslAuthenticateResponse{Version: version} + case 37: + return &CreatePartitionsResponse{Version: version} + case 42: + return &DeleteGroupsResponse{Version: version} + case 44: + return &IncrementalAlterConfigsResponse{Version: version} + case 45: + return &AlterPartitionReassignmentsResponse{Version: version} + case 46: + return &ListPartitionReassignmentsResponse{Version: version} + case 47: + return &DeleteOffsetsResponse{Version: version} + case 48: + return &DescribeClientQuotasResponse{Version: version} + case 49: + return &AlterClientQuotasResponse{Version: version} + case 50: + return &DescribeUserScramCredentialsResponse{Version: version} + case 51: + return &AlterUserScramCredentialsResponse{Version: version} + } + return nil +} + +func TestAllocateBodyProtocolVersions(t *testing.T) { + type test struct { + version KafkaVersion + apiVersions map[int16]int16 + } + + tests := []test{ + { + V1_1_0_0, + map[int16]int16{ + 0: 5, + 1: 7, + 2: 2, + 3: 5, + 4: 1, + 5: 0, + 6: 4, + 7: 1, + 8: 3, + 9: 3, + 10: 1, + 11: 2, + 12: 1, + 13: 1, + 14: 1, + 15: 1, + 16: 1, + 17: 1, + 18: 1, + 19: 2, + 20: 1, + 21: 0, + 22: 0, + 23: 0, + 24: 0, + 25: 0, + 26: 0, + 27: 0, + 28: 0, + 29: 0, + 30: 0, + 31: 0, + 32: 1, + 33: 0, + 34: 0, + 35: 0, + 36: 0, + 37: 0, + 38: 0, + 39: 0, + 40: 0, + 41: 0, + 42: 0, + }, + }, + } + + for _, tt := range tests { + for key, version := range tt.apiVersions { + t.Run(fmt.Sprintf("%s-%s", tt.version.String(), names[key]), func(t *testing.T) { + req := allocateBody(key, version) + if req == nil { + t.Skipf("apikey %d is not implemented", key) + } + resp := allocateResponseBody(req) + assert.NotNil(t, resp, fmt.Sprintf("%s has no matching response type in allocateResponseBody", reflect.TypeOf(req))) + assert.Equal(t, req.isValidVersion(), resp.isValidVersion(), fmt.Sprintf("%s isValidVersion should match %s", reflect.TypeOf(req), reflect.TypeOf(resp))) + for _, body := range []protocolBody{req, resp} { + assert.Equal(t, key, body.key()) + assert.Equal(t, version, body.version()) + assert.True(t, body.isValidVersion(), fmt.Sprintf("%s v%d is not supported, but expected for KafkaVersion %s", reflect.TypeOf(body), version, tt.version)) + assert.True(t, tt.version.IsAtLeast(body.requiredVersion()), fmt.Sprintf("KafkaVersion %s should be enough for %s v%d", tt.version, reflect.TypeOf(body), version)) + } + }) + } + } +} + // not specific to request tests, just helper functions for testing structures that // implement the encoder or decoder interfaces that needed somewhere to live From a9126add225283b6f1f33291cf5e9e97a7da67e1 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 2 Aug 2023 22:22:30 +0100 Subject: [PATCH 8/8] fix(proto): use DeleteRecordsRequest v1 Signed-off-by: Dominic Evans --- admin.go | 3 +++ delete_records_request.go | 9 +++++++-- delete_records_response.go | 9 +++++++-- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/admin.go b/admin.go index ef5f481a4..93ab7b863 100644 --- a/admin.go +++ b/admin.go @@ -605,6 +605,9 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i Topics: topics, Timeout: ca.conf.Admin.Timeout, } + if ca.conf.Version.IsAtLeast(V2_0_0_0) { + request.Version = 1 + } rsp, err := broker.DeleteRecords(request) if err != nil { errs = append(errs, err) diff --git a/delete_records_request.go b/delete_records_request.go index 687b042ad..3ca2146af 100644 --- a/delete_records_request.go +++ b/delete_records_request.go @@ -83,11 +83,16 @@ func (d *DeleteRecordsRequest) headerVersion() int16 { } func (d *DeleteRecordsRequest) isValidVersion() bool { - return d.Version == 0 + return d.Version >= 0 && d.Version <= 1 } func (d *DeleteRecordsRequest) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch d.Version { + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } type DeleteRecordsRequestTopic struct { diff --git a/delete_records_response.go b/delete_records_response.go index cb287524c..2d7db885b 100644 --- a/delete_records_response.go +++ b/delete_records_response.go @@ -85,11 +85,16 @@ func (d *DeleteRecordsResponse) headerVersion() int16 { } func (d *DeleteRecordsResponse) isValidVersion() bool { - return d.Version == 0 + return d.Version >= 0 && d.Version <= 1 } func (d *DeleteRecordsResponse) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch d.Version { + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } func (r *DeleteRecordsResponse) throttleTime() time.Duration {