Skip to content

Commit

Permalink
Merge pull request #111 from xakep666/feature/protobuf
Browse files Browse the repository at this point in the history
Add protobuf deserialization to consumer and serialization to producer
  • Loading branch information
d-rk authored Dec 3, 2021
2 parents 9cb9f72 + d9b6fb0 commit a02978e
Show file tree
Hide file tree
Showing 21 changed files with 979 additions and 93 deletions.
8 changes: 7 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,10 @@ linters:
- gofmt
- goimports
- revive
- govet
- govet

issues:
exclude-rules:
- linters:
- staticcheck
text: 'SA1019: package github.com/golang/protobuf/jsonpb is deprecated' # dependency of github.com/jhump/protoreflect, see https://github.com/jhump/protoreflect/issues/463
81 changes: 81 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ A command-line interface for interaction with Apache Kafka
- support for avro schemas
- Configuration of different contexts
- directly access kafka clusters inside your kubernetes cluster
- support for consuming and producing protobuf-encoded messages

[![asciicast](https://asciinema.org/a/vmxrTA0h8CAXPnJnSFk5uHKzr.svg)](https://asciinema.org/a/vmxrTA0h8CAXPnJnSFk5uHKzr)

Expand Down Expand Up @@ -119,6 +120,16 @@ contexts:
avro:
schemaRegistry: localhost:8081

# optional: default protobuf messages search paths
protobuf:
importPaths:
- "/usr/include/protobuf"
protoFiles:
- "someMessage.proto"
- "otherMessage.proto"
protosetFiles:
- "/usr/include/protoset/other.protoset"

# optional: changes the default partitioner
defaultPartitioner: "hash"

Expand Down Expand Up @@ -303,6 +314,11 @@ The following example prints keys in hex and values in base64:
kafkactl consume my-topic --print-keys --key-encoding=hex --value-encoding=base64
```

The consumer can convert protobuf messages to JSON in keys (optional) and values:
```bash
kafkactl consume my-topic --value-proto-type MyTopicValue --key-proto-type MyTopicKey --proto-file kafkamsg.proto
```

### Producing messages

Producing messages can be done in multiple ways. If we want to produce a message with `key='my-key'`,
Expand Down Expand Up @@ -379,6 +395,11 @@ Producing null values (tombstone record) is also possible:
kafkactl produce my-topic --null-value
```

Producing protobuf message converted from JSON:
```bash
kafkactl produce my-topic --key='{"keyField":123}' --key-proto-type MyKeyMessage --value='{"valueField":"value"}' --value-proto-type MyValueMessage --proto-file kafkamsg.proto
```

### Avro support

In order to enable avro support you just have to add the schema registry to your configuration:
Expand Down Expand Up @@ -421,6 +442,66 @@ The `consume` command handles this automatically and no configuration is needed.

An additional parameter `print-schema` can be provided to display the schema used for decoding.

### Protobuf support

`kafkactl` can consume and produce protobuf-encoded messages. In order to enable protobuf serialization/deserialization
you should add flag `--value-proto-type` and optionally `--key-proto-type` (if keys encoded in protobuf format)
with type name. Protobuf-encoded messages are mapped with [pbjson](https://developers.google.com/protocol-buffers/docs/proto3#json).

`kafkactl` will search messages in following order:
1. Protoset files specified in `--protoset-file` flag
2. Protoset files specified in `context.protobuf.protosetFiles` config value
3. Proto files specified in `--proto-file` flag
4. Proto files specified in `context.protobuf.protoFiles` config value

Proto files may require some dependencies in `import` sections. To specify additional lookup paths use
`--proto-import-path` flag or `context.protobuf.importPaths` config value.

If provided message types was not found `kafkactl` will return error.

Note that if you want to use raw proto files `protoc` installation don't need to be installed.

Also note that protoset files must be compiled with included imports:
```bash
protoc -o kafkamsg.protoset --include_imports kafkamsg.proto
```

#### Example
Assume you have following proto schema in `kafkamsg.proto`:
```protobuf
syntax = "proto3";
import "google/protobuf/timestamp.proto";
message TopicMessage {
google.protobuf.Timestamp produced_at = 1;
int64 num = 2;
}
message TopicKey {
float fvalue = 1;
}
```
"well-known" `google/protobuf` types are included so no additional proto files needed.

To produce message run
```bash
kafkactl produce <topic> --key '{"fvalue":1.2}' --key-proto-type TopicKey --value '{"producedAt":"2021-12-01T14:10:12Z","num":"1"}' --value-proto-type TopicValue --proto-file kafkamsg.proto
```
or with protoset
```bash
kafkactl produce <topic> --key '{"fvalue":1.2}' --key-proto-type TopicKey --value '{"producedAt":"2021-12-01T14:10:12Z","num":"1"}' --value-proto-type TopicValue --protoset-file kafkamsg.protoset
```

To consume messages run
```bash
kafkactl consume <topic> --key-proto-type TopicKey --value-proto-type TopicValue --proto-file kafkamsg.proto
```
or with protoset
```bash
kafkactl consume <topic> --key-proto-type TopicKey --value-proto-type TopicValue --protoset-file kafkamsg.protoset
```

### Altering topics

Using the `alter topic` command allows you to change the partition count, replication factor and topic-level
Expand Down
5 changes: 5 additions & 0 deletions cmd/consume/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ func NewConsumeCmd() *cobra.Command {
cmdConsume.Flags().StringVarP(&flags.OutputFormat, "output", "o", flags.OutputFormat, "output format. One of: json|yaml")
cmdConsume.Flags().StringVarP(&flags.EncodeKey, "key-encoding", "", flags.EncodeKey, "key encoding (auto-detected by default). One of: none|hex|base64")
cmdConsume.Flags().StringVarP(&flags.EncodeValue, "value-encoding", "", flags.EncodeValue, "value encoding (auto-detected by default). One of: none|hex|base64")
cmdConsume.Flags().StringSliceVarP(&flags.ProtoFiles, "proto-file", "", flags.ProtoFiles, "additional protobuf description file for searching message description")
cmdConsume.Flags().StringSliceVarP(&flags.ProtoImportPaths, "proto-import-path", "", flags.ProtoImportPaths, "additional path to search files listed in proto 'import' directive")
cmdConsume.Flags().StringSliceVarP(&flags.ProtosetFiles, "protoset-file", "", flags.ProtosetFiles, "additional compiled protobuf description file for searching message description")
cmdConsume.Flags().StringVarP(&flags.KeyProtoType, "key-proto-type", "", flags.KeyProtoType, "key protobuf message type")
cmdConsume.Flags().StringVarP(&flags.ValueProtoType, "value-proto-type", "", flags.ValueProtoType, "value protobuf message type")

return cmdConsume
}
117 changes: 117 additions & 0 deletions cmd/consume/consume_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package consume_test

import (
"encoding/hex"
"fmt"
"path/filepath"
"strings"
"testing"
"time"

"github.com/deviceinsight/kafkactl/internal/helpers/protobuf"

"github.com/deviceinsight/kafkactl/testutil"
"github.com/jhump/protoreflect/dynamic"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestConsumeWithKeyAndValueIntegration(t *testing.T) {
Expand Down Expand Up @@ -179,3 +186,113 @@ func TestAvroDeserializationErrorHandlingIntegration(t *testing.T) {
t.Fatalf("expected consumer to fail")
}
}

func TestProtobufConsumeProtoFileIntegration(t *testing.T) {
testutil.StartIntegrationTest(t)

pbTopic := testutil.CreateTopic(t, "proto-file")

kafkaCtl := testutil.CreateKafkaCtlCommand()

protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata")
now := time.Date(2021, time.December, 1, 14, 10, 12, 0, time.UTC)
pbMessageDesc := protobuf.ResolveMessageType(protobuf.SearchContext{
ProtoImportPaths: []string{protoPath},
ProtoFiles: []string{"msg.proto"},
}, "TopicMessage")
pbMessage := dynamic.NewMessage(pbMessageDesc)
pbMessage.SetFieldByNumber(1, timestamppb.New(now))
pbMessage.SetFieldByNumber(2, int64(1))

value, err := pbMessage.Marshal()
if err != nil {
t.Fatalf("Failed to marshal proto message: %s", err)
}

// produce valid pb message
if _, err := kafkaCtl.Execute("produce", pbTopic, "--key", "test-key", "--value", hex.EncodeToString(value), "--value-encoding", "hex", "-H", "key1:value1", "-H", "key\\:2:value\\:2"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--proto-import-path", protoPath, "--proto-file", "msg.proto", "--value-proto-type", "TopicMessage"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, `{"producedAt":"2021-12-01T14:10:12Z","num":"1"}`, kafkaCtl.GetStdOut())
}

func TestProtobufConsumeProtosetFileIntegration(t *testing.T) {
testutil.StartIntegrationTest(t)

pbTopic := testutil.CreateTopic(t, "proto-file")

kafkaCtl := testutil.CreateKafkaCtlCommand()

protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata", "msg.protoset")
now := time.Date(2021, time.December, 1, 14, 10, 12, 0, time.UTC)
pbMessageDesc := protobuf.ResolveMessageType(protobuf.SearchContext{
ProtosetFiles: []string{protoPath},
}, "TopicMessage")
pbMessage := dynamic.NewMessage(pbMessageDesc)
pbMessage.SetFieldByNumber(1, timestamppb.New(now))
pbMessage.SetFieldByNumber(2, int64(1))

value, err := pbMessage.Marshal()
if err != nil {
t.Fatalf("Failed to marshal proto message: %s", err)
}

// produce valid pb message
if _, err := kafkaCtl.Execute("produce", pbTopic, "--key", "test-key", "--value", hex.EncodeToString(value), "--value-encoding", "hex"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--protoset-file", protoPath, "--value-proto-type", "TopicMessage"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, `{"producedAt":"2021-12-01T14:10:12Z","num":"1"}`, kafkaCtl.GetStdOut())
}

func TestProtobufConsumeProtoFileErrNoMessageIntegration(t *testing.T) {
testutil.StartIntegrationTest(t)

pbTopic := testutil.CreateTopic(t, "proto-file")

kafkaCtl := testutil.CreateKafkaCtlCommand()

protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata", "msg.protoset")

if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--proto-import-path", protoPath, "--proto-file", "msg.proto", "--value-proto-type", "NonExisting"); err != nil {
testutil.AssertErrorContains(t, "not found in provided files", err)
} else {
t.Fatal("Expected consumer to fail")
}
}

func TestProtobufConsumeProtoFileErrDecodeIntegration(t *testing.T) {
testutil.StartIntegrationTest(t)

pbTopic := testutil.CreateTopic(t, "proto-file")

kafkaCtl := testutil.CreateKafkaCtlCommand()

protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata")

// produce invalid pb message
if _, err := kafkaCtl.Execute("produce", pbTopic, "--key", "test-key", "--value", "nonpb"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--proto-import-path", protoPath, "--proto-file", "msg.proto", "--value-proto-type", "TopicMessage"); err != nil {
testutil.AssertErrorContains(t, "value decode failed: proto: bad wiretype", err)
} else {
t.Fatal("Expected consumer to fail")
}
}
5 changes: 5 additions & 0 deletions cmd/produce/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func NewProduceCmd() *cobra.Command {
cmdProduce.Flags().StringVarP(&flags.ValueEncoding, "value-encoding", "", flags.ValueEncoding, "value encoding (none by default). One of: none|hex|base64")
cmdProduce.Flags().BoolVarP(&flags.Silent, "silent", "s", false, "do not write to standard output")
cmdProduce.Flags().IntVarP(&flags.RateInSeconds, "rate", "r", -1, "amount of messages per second to produce on the topic")
cmdProduce.Flags().StringSliceVarP(&flags.ProtoFiles, "proto-file", "", flags.ProtoFiles, "additional protobuf description file for searching message description")
cmdProduce.Flags().StringSliceVarP(&flags.ProtoImportPaths, "proto-import-path", "", flags.ProtoImportPaths, "additional path to search files listed in proto 'import' directive")
cmdProduce.Flags().StringSliceVarP(&flags.ProtosetFiles, "protoset-file", "", flags.ProtosetFiles, "additional compiled protobuf description file for searching message description")
cmdProduce.Flags().StringVarP(&flags.KeyProtoType, "key-proto-type", "", flags.KeyProtoType, "key protobuf message type")
cmdProduce.Flags().StringVarP(&flags.ValueProtoType, "value-proto-type", "", flags.ValueProtoType, "value protobuf message type")

return cmdProduce
}
Loading

0 comments on commit a02978e

Please sign in to comment.