diff --git a/request_test.go b/request_test.go index 193806994..1c34f1a81 100644 --- a/request_test.go +++ b/request_test.go @@ -2,12 +2,182 @@ package sarama import ( "bytes" + "fmt" "reflect" "testing" "github.com/davecgh/go-spew/spew" + assert "github.com/stretchr/testify/require" ) +// allocateResponseBody is a test-only clone of allocateBody. There's no +// central registry of types, so we can't do this using reflection for Response +// types and assuming that the struct is identically named, just with Response +// instead of Request. +func allocateResponseBody(req protocolBody) protocolBody { + key := req.key() + version := req.version() + switch key { + case 0: + return &ProduceRequest{Version: version} + case 1: + return &FetchRequest{Version: version} + case 2: + return &OffsetRequest{Version: version} + case 3: + return &MetadataRequest{Version: version} + case 8: + return &OffsetCommitRequest{Version: version} + case 9: + return &OffsetFetchRequest{Version: version} + case 10: + return &FindCoordinatorRequest{Version: version} + case 11: + return &JoinGroupRequest{Version: version} + case 12: + return &HeartbeatRequest{Version: version} + case 13: + return &LeaveGroupRequest{Version: version} + case 14: + return &SyncGroupRequest{Version: version} + case 15: + return &DescribeGroupsRequest{Version: version} + case 16: + return &ListGroupsRequest{Version: version} + case 17: + return &SaslHandshakeRequest{Version: version} + case 18: + return &ApiVersionsRequest{Version: version} + case 19: + return &CreateTopicsRequest{Version: version} + case 20: + return &DeleteTopicsRequest{Version: version} + case 21: + return &DeleteRecordsRequest{Version: version} + case 22: + return &InitProducerIDRequest{Version: version} + case 24: + return &AddPartitionsToTxnRequest{Version: version} + case 25: + return &AddOffsetsToTxnRequest{Version: version} + case 26: + return &EndTxnRequest{Version: version} + case 28: + return &TxnOffsetCommitRequest{Version: version} + case 29: + return &DescribeAclsRequest{Version: int(version)} + case 30: + return &CreateAclsRequest{Version: version} + case 31: + return &DeleteAclsRequest{Version: int(version)} + case 32: + return &DescribeConfigsRequest{Version: version} + case 33: + return &AlterConfigsRequest{Version: version} + case 35: + return &DescribeLogDirsRequest{Version: version} + case 36: + return &SaslAuthenticateRequest{Version: version} + case 37: + return &CreatePartitionsRequest{Version: version} + case 42: + return &DeleteGroupsRequest{Version: version} + case 44: + return &IncrementalAlterConfigsRequest{Version: version} + case 45: + return &AlterPartitionReassignmentsRequest{Version: version} + case 46: + return &ListPartitionReassignmentsRequest{Version: version} + case 47: + return &DeleteOffsetsRequest{Version: version} + case 48: + return &DescribeClientQuotasRequest{Version: version} + case 49: + return &AlterClientQuotasRequest{Version: version} + case 50: + return &DescribeUserScramCredentialsRequest{Version: version} + case 51: + return &AlterUserScramCredentialsRequest{Version: version} + } + return nil +} + +func TestAllocateBodyVersions(t *testing.T) { + type test struct { + version KafkaVersion + apiVersions map[int16]int16 + } + + tests := []test{ + { + V1_1_0_0, + map[int16]int16{ + 0: 5, + 1: 7, + 2: 2, + 3: 5, + 4: 1, + 5: 0, + 6: 4, + 7: 1, + 8: 3, + 9: 3, + 10: 1, + 11: 2, + 12: 1, + 13: 1, + 14: 1, + 15: 1, + 16: 1, + 17: 1, + 18: 1, + 19: 2, + 20: 1, + 21: 0, + 22: 0, + 23: 0, + 24: 0, + 25: 0, + 26: 0, + 27: 0, + 28: 0, + 29: 0, + 30: 0, + 31: 0, + 32: 1, + 33: 0, + 34: 0, + 35: 0, + 36: 0, + 37: 0, + 38: 0, + 39: 0, + 40: 0, + 41: 0, + 42: 0, + }, + }, + } + + for _, tt := range tests { + for key, version := range tt.apiVersions { + t.Run(fmt.Sprintf("%s-apikey-%d", tt.version.String(), key), func(t *testing.T) { + req := allocateBody(key, version) + if req == nil { + t.Skipf("apikey %d is not implemented", key) + } + resp := allocateResponseBody(req) + assert.NotNil(t, resp, fmt.Sprintf("%s has no matching response type in allocateResponseBody", reflect.TypeOf(req))) + for _, body := range []protocolBody{req, resp} { + assert.Equal(t, version, body.version()) + assert.True(t, body.isValidVersion(), fmt.Sprintf("%s v%d is not supported", reflect.TypeOf(body), version)) + assert.True(t, tt.version.IsAtLeast(body.requiredVersion()), fmt.Sprintf("KafkaVersion %s should be enough for %s v%d", tt.version, reflect.TypeOf(body), version)) + } + }) + } + } +} + // not specific to request tests, just helper functions for testing structures that // implement the encoder or decoder interfaces that needed somewhere to live