Skip to content

Commit

Permalink
feat(proto): add test around supported versions
Browse files Browse the repository at this point in the history
Initially seeded with only the protocol versions required for Kafka
v1.1, check we've implemented the expected versions and they pass the
isValidVersion and the requiredVersion checks as expected.

Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Aug 2, 2023
1 parent d44d056 commit 7b96841
Showing 1 changed file with 244 additions and 0 deletions.
244 changes: 244 additions & 0 deletions request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,256 @@ package sarama

import (
"bytes"
"fmt"
"reflect"
"testing"

"github.com/davecgh/go-spew/spew"
assert "github.com/stretchr/testify/require"
)

var names = map[int16]string{
0: "ProduceRequest",
1: "FetchRequest",
2: "ListOffsetsRequest",
3: "MetadataRequest",
4: "LeaderAndIsrRequest",
5: "StopReplicaRequest",
6: "UpdateMetadataRequest",
7: "ControlledShutdownRequest",
8: "OffsetCommitRequest",
9: "OffsetFetchRequest",
10: "FindCoordinatorRequest",
11: "JoinGroupRequest",
12: "HeartbeatRequest",
13: "LeaveGroupRequest",
14: "SyncGroupRequest",
15: "DescribeGroupsRequest",
16: "ListGroupsRequest",
17: "SaslHandshakeRequest",
18: "ApiVersionsRequest",
19: "CreateTopicsRequest",
20: "DeleteTopicsRequest",
21: "DeleteRecordsRequest",
22: "InitProducerIdRequest",
23: "OffsetForLeaderEpochRequest",
24: "AddPartitionsToTxnRequest",
25: "AddOffsetsToTxnRequest",
26: "EndTxnRequest",
27: "WriteTxnMarkersRequest",
28: "TxnOffsetCommitRequest",
29: "DescribeAclsRequest",
30: "CreateAclsRequest",
31: "DeleteAclsRequest",
32: "DescribeConfigsRequest",
33: "AlterConfigsRequest",
34: "AlterReplicaLogDirsRequest",
35: "DescribeLogDirsRequest",
36: "SaslAuthenticateRequest",
37: "CreatePartitionsRequest",
38: "CreateDelegationTokenRequest",
39: "RenewDelegationTokenRequest",
40: "ExpireDelegationTokenRequest",
41: "DescribeDelegationTokenRequest",
42: "DeleteGroupsRequest",
43: "ElectLeadersRequest",
44: "IncrementalAlterConfigsRequest",
45: "AlterPartitionReassignmentsRequest",
46: "ListPartitionReassignmentsRequest",
47: "OffsetDeleteRequest",
48: "DescribeClientQuotasRequest",
49: "AlterClientQuotasRequest",
50: "DescribeUserScramCredentialsRequest",
51: "AlterUserScramCredentialsRequest",
52: "VoteRequest",
53: "BeginQuorumEpochRequest",
54: "EndQuorumEpochRequest",
55: "DescribeQuorumRequest",
56: "AlterPartitionRequest",
57: "UpdateFeaturesRequest",
58: "EnvelopeRequest",
59: "FetchSnapshotRequest",
60: "DescribeClusterRequest",
61: "DescribeProducersRequest",
62: "BrokerRegistrationRequest",
63: "BrokerHeartbeatRequest",
64: "UnregisterBrokerRequest",
65: "DescribeTransactionsRequest",
66: "ListTransactionsRequest",
67: "AllocateProducerIdsRequest",
68: "ConsumerGroupHeartbeatRequest",
}

// allocateResponseBody is a test-only clone of allocateBody. There's no
// central registry of types, so we can't do this using reflection for Response
// types and assuming that the struct is identically named, just with Response
// instead of Request.
func allocateResponseBody(req protocolBody) protocolBody {
key := req.key()
version := req.version()
switch key {
case 0:
return &ProduceResponse{Version: version}
case 1:
return &FetchResponse{Version: version}
case 2:
return &OffsetResponse{Version: version}
case 3:
return &MetadataResponse{Version: version}
case 8:
return &OffsetCommitResponse{Version: version}
case 9:
return &OffsetFetchResponse{Version: version}
case 10:
return &FindCoordinatorResponse{Version: version}
case 11:
return &JoinGroupResponse{Version: version}
case 12:
return &HeartbeatResponse{Version: version}
case 13:
return &LeaveGroupResponse{Version: version}
case 14:
return &SyncGroupResponse{Version: version}
case 15:
return &DescribeGroupsResponse{Version: version}
case 16:
return &ListGroupsResponse{Version: version}
case 17:
return &SaslHandshakeResponse{Version: version}
case 18:
return &ApiVersionsResponse{Version: version}
case 19:
return &CreateTopicsResponse{Version: version}
case 20:
return &DeleteTopicsResponse{Version: version}
case 21:
return &DeleteRecordsResponse{Version: version}
case 22:
return &InitProducerIDResponse{Version: version}
case 24:
return &AddPartitionsToTxnResponse{Version: version}
case 25:
return &AddOffsetsToTxnResponse{Version: version}
case 26:
return &EndTxnResponse{Version: version}
case 28:
return &TxnOffsetCommitResponse{Version: version}
case 29:
return &DescribeAclsResponse{Version: version}
case 30:
return &CreateAclsResponse{Version: version}
case 31:
return &DeleteAclsResponse{Version: version}
case 32:
return &DescribeConfigsResponse{Version: version}
case 33:
return &AlterConfigsResponse{Version: version}
case 35:
return &DescribeLogDirsResponse{Version: version}
case 36:
return &SaslAuthenticateResponse{Version: version}
case 37:
return &CreatePartitionsResponse{Version: version}
case 42:
return &DeleteGroupsResponse{Version: version}
case 44:
return &IncrementalAlterConfigsResponse{Version: version}
case 45:
return &AlterPartitionReassignmentsResponse{Version: version}
case 46:
return &ListPartitionReassignmentsResponse{Version: version}
case 47:
return &DeleteOffsetsResponse{Version: version}
case 48:
return &DescribeClientQuotasResponse{Version: version}
case 49:
return &AlterClientQuotasResponse{Version: version}
case 50:
return &DescribeUserScramCredentialsResponse{Version: version}
case 51:
return &AlterUserScramCredentialsResponse{Version: version}
}
return nil
}

func TestAllocateBodyProtocolVersions(t *testing.T) {
type test struct {
version KafkaVersion
apiVersions map[int16]int16
}

tests := []test{
{
V1_1_0_0,
map[int16]int16{
0: 5,
1: 7,
2: 2,
3: 5,
4: 1,
5: 0,
6: 4,
7: 1,
8: 3,
9: 3,
10: 1,
11: 2,
12: 1,
13: 1,
14: 1,
15: 1,
16: 1,
17: 1,
18: 1,
19: 2,
20: 1,
21: 0,
22: 0,
23: 0,
24: 0,
25: 0,
26: 0,
27: 0,
28: 0,
29: 0,
30: 0,
31: 0,
32: 1,
33: 0,
34: 0,
35: 0,
36: 0,
37: 0,
38: 0,
39: 0,
40: 0,
41: 0,
42: 0,
},
},
}

for _, tt := range tests {
for key, version := range tt.apiVersions {
t.Run(fmt.Sprintf("%s-%s", tt.version.String(), names[key]), func(t *testing.T) {
req := allocateBody(key, version)
if req == nil {
t.Skipf("apikey %d is not implemented", key)
}
resp := allocateResponseBody(req)
assert.NotNil(t, resp, fmt.Sprintf("%s has no matching response type in allocateResponseBody", reflect.TypeOf(req)))
assert.Equal(t, req.isValidVersion(), resp.isValidVersion(), fmt.Sprintf("%s isValidVersion should match %s", reflect.TypeOf(req), reflect.TypeOf(resp)))
for _, body := range []protocolBody{req, resp} {
assert.Equal(t, key, body.key())
assert.Equal(t, version, body.version())
assert.True(t, body.isValidVersion(), fmt.Sprintf("%s v%d is not supported, but expected for KafkaVersion %s", reflect.TypeOf(body), version, tt.version))
assert.True(t, tt.version.IsAtLeast(body.requiredVersion()), fmt.Sprintf("KafkaVersion %s should be enough for %s v%d", tt.version, reflect.TypeOf(body), version))
}
})
}
}
}

// not specific to request tests, just helper functions for testing structures that
// implement the encoder or decoder interfaces that needed somewhere to live

Expand Down

0 comments on commit 7b96841

Please sign in to comment.