From d9b6fb0e7ad5e015a2de009471a2ca72c809d869 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D1=82=D0=BE=D0=BB=D1=8F=D1=80=D0=BE=D0=B2=20=D0=92?= =?UTF-8?q?=D0=BB=D0=B0=D0=B4=D0=B8=D0=BC=D0=B8=D1=80=20=D0=90=D0=BB=D0=B5?= =?UTF-8?q?=D0=BA=D1=81=D0=B5=D0=B5=D0=B2=D0=B8=D1=87?= Date: Wed, 1 Dec 2021 16:45:23 +0300 Subject: [PATCH] Add protobuf deserialization to consumer and serialization to producer --- .golangci.yml | 8 +- README.md | 81 ++++++++ cmd/consume/consume.go | 5 + cmd/consume/consume_test.go | 117 ++++++++++++ cmd/produce/produce.go | 5 + cmd/produce/produce_test.go | 178 ++++++++++++++++++ cmd/root.go | 34 ++-- cmd/root_test.go | 8 + go.mod | 8 +- go.sum | 77 +++++--- internal/common-operation.go | 5 + internal/consumer/MessageDeserializerChain.go | 40 ++++ .../consumer/ProtobufMessageDeserializer.go | 149 +++++++++++++++ internal/consumer/consumer-operation.go | 64 ++++--- internal/helpers/protobuf/protobuf.go | 89 +++++++++ internal/producer/MessageSerializerChain.go | 47 +++++ .../producer/ProtobufMessageSerializer.go | 84 +++++++++ internal/producer/producer-operation.go | 40 ++-- testutil/test_util.go | 4 + testutil/testdata/msg.proto | 14 ++ testutil/testdata/msg.protoset | 15 ++ 21 files changed, 979 insertions(+), 93 deletions(-) create mode 100644 internal/consumer/MessageDeserializerChain.go create mode 100644 internal/consumer/ProtobufMessageDeserializer.go create mode 100644 internal/helpers/protobuf/protobuf.go create mode 100644 internal/producer/MessageSerializerChain.go create mode 100644 internal/producer/ProtobufMessageSerializer.go create mode 100644 testutil/testdata/msg.proto create mode 100644 testutil/testdata/msg.protoset diff --git a/.golangci.yml b/.golangci.yml index afd5d39..7778afc 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -4,4 +4,10 @@ linters: - gofmt - goimports - revive - - govet \ No newline at end of file + - govet + +issues: + exclude-rules: + - linters: + - staticcheck + text: 'SA1019: package github.com/golang/protobuf/jsonpb is deprecated' # dependency of github.com/jhump/protoreflect, see https://github.com/jhump/protoreflect/issues/463 diff --git a/README.md b/README.md index fbb107a..38474d0 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ A command-line interface for interaction with Apache Kafka - support for avro schemas - Configuration of different contexts - directly access kafka clusters inside your kubernetes cluster +- support for consuming and producing protobuf-encoded messages [![asciicast](https://asciinema.org/a/vmxrTA0h8CAXPnJnSFk5uHKzr.svg)](https://asciinema.org/a/vmxrTA0h8CAXPnJnSFk5uHKzr) @@ -119,6 +120,16 @@ contexts: avro: schemaRegistry: localhost:8081 + # optional: default protobuf messages search paths + protobuf: + importPaths: + - "/usr/include/protobuf" + protoFiles: + - "someMessage.proto" + - "otherMessage.proto" + protosetFiles: + - "/usr/include/protoset/other.protoset" + # optional: changes the default partitioner defaultPartitioner: "hash" @@ -303,6 +314,11 @@ The following example prints keys in hex and values in base64: kafkactl consume my-topic --print-keys --key-encoding=hex --value-encoding=base64 ``` +The consumer can convert protobuf messages to JSON in keys (optional) and values: +```bash +kafkactl consume my-topic --value-proto-type MyTopicValue --key-proto-type MyTopicKey --proto-file kafkamsg.proto +``` + ### Producing messages Producing messages can be done in multiple ways. If we want to produce a message with `key='my-key'`, @@ -379,6 +395,11 @@ Producing null values (tombstone record) is also possible: kafkactl produce my-topic --null-value ``` +Producing protobuf message converted from JSON: +```bash +kafkactl produce my-topic --key='{"keyField":123}' --key-proto-type MyKeyMessage --value='{"valueField":"value"}' --value-proto-type MyValueMessage --proto-file kafkamsg.proto +``` + ### Avro support In order to enable avro support you just have to add the schema registry to your configuration: @@ -421,6 +442,66 @@ The `consume` command handles this automatically and no configuration is needed. An additional parameter `print-schema` can be provided to display the schema used for decoding. +### Protobuf support + +`kafkactl` can consume and produce protobuf-encoded messages. In order to enable protobuf serialization/deserialization +you should add flag `--value-proto-type` and optionally `--key-proto-type` (if keys encoded in protobuf format) +with type name. Protobuf-encoded messages are mapped with [pbjson](https://developers.google.com/protocol-buffers/docs/proto3#json). + +`kafkactl` will search messages in following order: +1. Protoset files specified in `--protoset-file` flag +2. Protoset files specified in `context.protobuf.protosetFiles` config value +3. Proto files specified in `--proto-file` flag +4. Proto files specified in `context.protobuf.protoFiles` config value + +Proto files may require some dependencies in `import` sections. To specify additional lookup paths use +`--proto-import-path` flag or `context.protobuf.importPaths` config value. + +If provided message types was not found `kafkactl` will return error. + +Note that if you want to use raw proto files `protoc` installation don't need to be installed. + +Also note that protoset files must be compiled with included imports: +```bash +protoc -o kafkamsg.protoset --include_imports kafkamsg.proto +``` + +#### Example +Assume you have following proto schema in `kafkamsg.proto`: +```protobuf +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +message TopicMessage { + google.protobuf.Timestamp produced_at = 1; + int64 num = 2; +} + +message TopicKey { + float fvalue = 1; +} +``` +"well-known" `google/protobuf` types are included so no additional proto files needed. + +To produce message run +```bash +kafkactl produce --key '{"fvalue":1.2}' --key-proto-type TopicKey --value '{"producedAt":"2021-12-01T14:10:12Z","num":"1"}' --value-proto-type TopicValue --proto-file kafkamsg.proto +``` +or with protoset +```bash +kafkactl produce --key '{"fvalue":1.2}' --key-proto-type TopicKey --value '{"producedAt":"2021-12-01T14:10:12Z","num":"1"}' --value-proto-type TopicValue --protoset-file kafkamsg.protoset +``` + +To consume messages run +```bash +kafkactl consume --key-proto-type TopicKey --value-proto-type TopicValue --proto-file kafkamsg.proto +``` +or with protoset +```bash +kafkactl consume --key-proto-type TopicKey --value-proto-type TopicValue --protoset-file kafkamsg.protoset +``` + ### Altering topics Using the `alter topic` command allows you to change the partition count, replication factor and topic-level diff --git a/cmd/consume/consume.go b/cmd/consume/consume.go index a83c857..73ea137 100644 --- a/cmd/consume/consume.go +++ b/cmd/consume/consume.go @@ -39,6 +39,11 @@ func NewConsumeCmd() *cobra.Command { cmdConsume.Flags().StringVarP(&flags.OutputFormat, "output", "o", flags.OutputFormat, "output format. One of: json|yaml") cmdConsume.Flags().StringVarP(&flags.EncodeKey, "key-encoding", "", flags.EncodeKey, "key encoding (auto-detected by default). One of: none|hex|base64") cmdConsume.Flags().StringVarP(&flags.EncodeValue, "value-encoding", "", flags.EncodeValue, "value encoding (auto-detected by default). One of: none|hex|base64") + cmdConsume.Flags().StringSliceVarP(&flags.ProtoFiles, "proto-file", "", flags.ProtoFiles, "additional protobuf description file for searching message description") + cmdConsume.Flags().StringSliceVarP(&flags.ProtoImportPaths, "proto-import-path", "", flags.ProtoImportPaths, "additional path to search files listed in proto 'import' directive") + cmdConsume.Flags().StringSliceVarP(&flags.ProtosetFiles, "protoset-file", "", flags.ProtosetFiles, "additional compiled protobuf description file for searching message description") + cmdConsume.Flags().StringVarP(&flags.KeyProtoType, "key-proto-type", "", flags.KeyProtoType, "key protobuf message type") + cmdConsume.Flags().StringVarP(&flags.ValueProtoType, "value-proto-type", "", flags.ValueProtoType, "value protobuf message type") return cmdConsume } diff --git a/cmd/consume/consume_test.go b/cmd/consume/consume_test.go index a95031e..66f719e 100644 --- a/cmd/consume/consume_test.go +++ b/cmd/consume/consume_test.go @@ -1,11 +1,18 @@ package consume_test import ( + "encoding/hex" "fmt" + "path/filepath" "strings" "testing" + "time" + + "github.com/deviceinsight/kafkactl/internal/helpers/protobuf" "github.com/deviceinsight/kafkactl/testutil" + "github.com/jhump/protoreflect/dynamic" + "google.golang.org/protobuf/types/known/timestamppb" ) func TestConsumeWithKeyAndValueIntegration(t *testing.T) { @@ -179,3 +186,113 @@ func TestAvroDeserializationErrorHandlingIntegration(t *testing.T) { t.Fatalf("expected consumer to fail") } } + +func TestProtobufConsumeProtoFileIntegration(t *testing.T) { + testutil.StartIntegrationTest(t) + + pbTopic := testutil.CreateTopic(t, "proto-file") + + kafkaCtl := testutil.CreateKafkaCtlCommand() + + protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata") + now := time.Date(2021, time.December, 1, 14, 10, 12, 0, time.UTC) + pbMessageDesc := protobuf.ResolveMessageType(protobuf.SearchContext{ + ProtoImportPaths: []string{protoPath}, + ProtoFiles: []string{"msg.proto"}, + }, "TopicMessage") + pbMessage := dynamic.NewMessage(pbMessageDesc) + pbMessage.SetFieldByNumber(1, timestamppb.New(now)) + pbMessage.SetFieldByNumber(2, int64(1)) + + value, err := pbMessage.Marshal() + if err != nil { + t.Fatalf("Failed to marshal proto message: %s", err) + } + + // produce valid pb message + if _, err := kafkaCtl.Execute("produce", pbTopic, "--key", "test-key", "--value", hex.EncodeToString(value), "--value-encoding", "hex", "-H", "key1:value1", "-H", "key\\:2:value\\:2"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut()) + + if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--proto-import-path", protoPath, "--proto-file", "msg.proto", "--value-proto-type", "TopicMessage"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, `{"producedAt":"2021-12-01T14:10:12Z","num":"1"}`, kafkaCtl.GetStdOut()) +} + +func TestProtobufConsumeProtosetFileIntegration(t *testing.T) { + testutil.StartIntegrationTest(t) + + pbTopic := testutil.CreateTopic(t, "proto-file") + + kafkaCtl := testutil.CreateKafkaCtlCommand() + + protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata", "msg.protoset") + now := time.Date(2021, time.December, 1, 14, 10, 12, 0, time.UTC) + pbMessageDesc := protobuf.ResolveMessageType(protobuf.SearchContext{ + ProtosetFiles: []string{protoPath}, + }, "TopicMessage") + pbMessage := dynamic.NewMessage(pbMessageDesc) + pbMessage.SetFieldByNumber(1, timestamppb.New(now)) + pbMessage.SetFieldByNumber(2, int64(1)) + + value, err := pbMessage.Marshal() + if err != nil { + t.Fatalf("Failed to marshal proto message: %s", err) + } + + // produce valid pb message + if _, err := kafkaCtl.Execute("produce", pbTopic, "--key", "test-key", "--value", hex.EncodeToString(value), "--value-encoding", "hex"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut()) + + if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--protoset-file", protoPath, "--value-proto-type", "TopicMessage"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, `{"producedAt":"2021-12-01T14:10:12Z","num":"1"}`, kafkaCtl.GetStdOut()) +} + +func TestProtobufConsumeProtoFileErrNoMessageIntegration(t *testing.T) { + testutil.StartIntegrationTest(t) + + pbTopic := testutil.CreateTopic(t, "proto-file") + + kafkaCtl := testutil.CreateKafkaCtlCommand() + + protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata", "msg.protoset") + + if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--proto-import-path", protoPath, "--proto-file", "msg.proto", "--value-proto-type", "NonExisting"); err != nil { + testutil.AssertErrorContains(t, "not found in provided files", err) + } else { + t.Fatal("Expected consumer to fail") + } +} + +func TestProtobufConsumeProtoFileErrDecodeIntegration(t *testing.T) { + testutil.StartIntegrationTest(t) + + pbTopic := testutil.CreateTopic(t, "proto-file") + + kafkaCtl := testutil.CreateKafkaCtlCommand() + + protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata") + + // produce invalid pb message + if _, err := kafkaCtl.Execute("produce", pbTopic, "--key", "test-key", "--value", "nonpb"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut()) + + if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--proto-import-path", protoPath, "--proto-file", "msg.proto", "--value-proto-type", "TopicMessage"); err != nil { + testutil.AssertErrorContains(t, "value decode failed: proto: bad wiretype", err) + } else { + t.Fatal("Expected consumer to fail") + } +} diff --git a/cmd/produce/produce.go b/cmd/produce/produce.go index 6b4add6..202aa36 100644 --- a/cmd/produce/produce.go +++ b/cmd/produce/produce.go @@ -42,6 +42,11 @@ func NewProduceCmd() *cobra.Command { cmdProduce.Flags().StringVarP(&flags.ValueEncoding, "value-encoding", "", flags.ValueEncoding, "value encoding (none by default). One of: none|hex|base64") cmdProduce.Flags().BoolVarP(&flags.Silent, "silent", "s", false, "do not write to standard output") cmdProduce.Flags().IntVarP(&flags.RateInSeconds, "rate", "r", -1, "amount of messages per second to produce on the topic") + cmdProduce.Flags().StringSliceVarP(&flags.ProtoFiles, "proto-file", "", flags.ProtoFiles, "additional protobuf description file for searching message description") + cmdProduce.Flags().StringSliceVarP(&flags.ProtoImportPaths, "proto-import-path", "", flags.ProtoImportPaths, "additional path to search files listed in proto 'import' directive") + cmdProduce.Flags().StringSliceVarP(&flags.ProtosetFiles, "protoset-file", "", flags.ProtosetFiles, "additional compiled protobuf description file for searching message description") + cmdProduce.Flags().StringVarP(&flags.KeyProtoType, "key-proto-type", "", flags.KeyProtoType, "key protobuf message type") + cmdProduce.Flags().StringVarP(&flags.ValueProtoType, "value-proto-type", "", flags.ValueProtoType, "value protobuf message type") return cmdProduce } diff --git a/cmd/produce/produce_test.go b/cmd/produce/produce_test.go index a4d1dcc..3505dcc 100644 --- a/cmd/produce/produce_test.go +++ b/cmd/produce/produce_test.go @@ -1,10 +1,16 @@ package produce_test import ( + "encoding/hex" "fmt" + "path/filepath" "strings" "testing" + "github.com/deviceinsight/kafkactl/internal/helpers/protobuf" + + "github.com/jhump/protoreflect/dynamic" + "github.com/deviceinsight/kafkactl/testutil" ) @@ -174,3 +180,175 @@ func TestProduceAutoCompletionIntegration(t *testing.T) { testutil.AssertContains(t, topicName2, outputLines) testutil.AssertContains(t, topicName3, outputLines) } + +func TestProduceProtoFileIntegration(t *testing.T) { + testutil.StartIntegrationTest(t) + + pbTopic := testutil.CreateTopic(t, "produce-topic-pb") + + protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata") + + kafkaCtl := testutil.CreateKafkaCtlCommand() + + key := `{"fvalue":1.2}` + value := `{"producedAt":"2021-12-01T14:10:12Z","num":"1"}` + + if _, err := kafkaCtl.Execute("produce", pbTopic, + "--key", key, "--key-proto-type", "TopicKey", + "--value", value, "--value-proto-type", "TopicMessage", + "--proto-import-path", protoPath, "--proto-file", "msg.proto"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut()) + + if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--print-keys", "--key-encoding", "hex", "--value-encoding", "hex"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + kv := strings.Split(kafkaCtl.GetStdOut(), "#") + + rawKey, err := hex.DecodeString(strings.TrimSpace(kv[0])) + if err != nil { + t.Fatalf("Failed to decode key: %s", err) + } + + rawValue, err := hex.DecodeString(strings.TrimSpace(kv[1])) + if err != nil { + t.Fatalf("Failed to decode value: %s", err) + } + + keyMessage := dynamic.NewMessage(protobuf.ResolveMessageType(protobuf.SearchContext{ + ProtoImportPaths: []string{protoPath}, + ProtoFiles: []string{"msg.proto"}, + }, "TopicKey")) + valueMessage := dynamic.NewMessage(protobuf.ResolveMessageType(protobuf.SearchContext{ + ProtoImportPaths: []string{protoPath}, + ProtoFiles: []string{"msg.proto"}, + }, "TopicMessage")) + + if err = keyMessage.Unmarshal(rawKey); err != nil { + t.Fatalf("Unmarshal key failed: %s", err) + } + if err = valueMessage.Unmarshal(rawValue); err != nil { + t.Fatalf("Unmarshal value failed: %s", err) + } + + actualKey, err := keyMessage.MarshalJSON() + if err != nil { + t.Fatalf("Key to json failed: %s", err) + } + + actualValue, err := valueMessage.MarshalJSON() + if err != nil { + t.Fatalf("Value to json failed: %s", err) + } + + testutil.AssertEquals(t, key, string(actualKey)) + testutil.AssertEquals(t, value, string(actualValue)) +} + +func TestProduceProtosetFileIntegration(t *testing.T) { + testutil.StartIntegrationTest(t) + + pbTopic := testutil.CreateTopic(t, "produce-topic-pb") + + protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata", "msg.protoset") + + kafkaCtl := testutil.CreateKafkaCtlCommand() + + key := `{"fvalue":1.2}` + value := `{"producedAt":"2021-12-01T14:10:12Z","num":"1"}` + + if _, err := kafkaCtl.Execute("produce", pbTopic, + "--key", key, "--key-proto-type", "TopicKey", + "--value", value, "--value-proto-type", "TopicMessage", + "--protoset-file", protoPath); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut()) + + if _, err := kafkaCtl.Execute("consume", pbTopic, "--from-beginning", "--exit", "--print-keys", "--key-encoding", "hex", "--value-encoding", "hex"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + kv := strings.Split(kafkaCtl.GetStdOut(), "#") + + rawKey, err := hex.DecodeString(strings.TrimSpace(kv[0])) + if err != nil { + t.Fatalf("Failed to decode key: %s", err) + } + + rawValue, err := hex.DecodeString(strings.TrimSpace(kv[1])) + if err != nil { + t.Fatalf("Failed to decode value: %s", err) + } + + keyMessage := dynamic.NewMessage(protobuf.ResolveMessageType(protobuf.SearchContext{ + ProtosetFiles: []string{protoPath}, + }, "TopicKey")) + valueMessage := dynamic.NewMessage(protobuf.ResolveMessageType(protobuf.SearchContext{ + ProtosetFiles: []string{protoPath}, + }, "TopicMessage")) + + if err = keyMessage.Unmarshal(rawKey); err != nil { + t.Fatalf("Unmarshal key failed: %s", err) + } + if err = valueMessage.Unmarshal(rawValue); err != nil { + t.Fatalf("Unmarshal value failed: %s", err) + } + + actualKey, err := keyMessage.MarshalJSON() + if err != nil { + t.Fatalf("Key to json failed: %s", err) + } + + actualValue, err := valueMessage.MarshalJSON() + if err != nil { + t.Fatalf("Value to json failed: %s", err) + } + + testutil.AssertEquals(t, key, string(actualKey)) + testutil.AssertEquals(t, value, string(actualValue)) +} + +func TestProduceProtoFileBadJSONIntegration(t *testing.T) { + testutil.StartIntegrationTest(t) + + pbTopic := testutil.CreateTopic(t, "produce-topic-pb") + + protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata") + + kafkaCtl := testutil.CreateKafkaCtlCommand() + + value := `{"producedAt":"2021-12-01T14:10:1` + + if _, err := kafkaCtl.Execute("produce", pbTopic, + "--value", value, "--value-proto-type", "TopicMessage", + "--proto-import-path", protoPath, "--proto-file", "msg.proto"); err != nil { + testutil.AssertErrorContains(t, "invalid json", err) + } else { + t.Fatalf("Expected producer to fail") + } +} + +func TestProduceProtoFileErrNoMessageIntegration(t *testing.T) { + testutil.StartIntegrationTest(t) + + pbTopic := testutil.CreateTopic(t, "produce-topic-pb") + + protoPath := filepath.Join(testutil.RootDir, "testutil", "testdata") + + kafkaCtl := testutil.CreateKafkaCtlCommand() + + value := `{"producedAt":"2021-12-01T14:10:1` + + if _, err := kafkaCtl.Execute("produce", pbTopic, + "--value", value, "--value-proto-type", "unknown", + "--proto-import-path", protoPath, "--proto-file", "msg.proto"); err != nil { + testutil.AssertErrorContains(t, "not found in provided files", err) + } else { + t.Fatalf("Expected producer to fail") + } +} diff --git a/cmd/root.go b/cmd/root.go index 327e941..cf1f632 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -27,21 +27,25 @@ var cfgFile string var Verbose bool var envMapping = map[string]string{ - "REQUESTTIMEOUT": "CONTEXTS_DEFAULT_REQUESTTIMEOUT", - "BROKERS": "CONTEXTS_DEFAULT_BROKERS", - "TLS_ENABLED": "CONTEXTS_DEFAULT_TLS_ENABLED", - "TLS_CA": "CONTEXTS_DEFAULT_TLS_CA", - "TLS_CERT": "CONTEXTS_DEFAULT_TLS_CERT", - "TLS_CERTKEY": "CONTEXTS_DEFAULT_TLS_CERTKEY", - "TLS_INSECURE": "CONTEXTS_DEFAULT_TLS_INSECURE", - "SASL_ENABLED": "CONTEXTS_DEFAULT_SASL_ENABLED", - "SASL_USERNAME": "CONTEXTS_DEFAULT_SASL_USERNAME", - "SASL_PASSWORD": "CONTEXTS_DEFAULT_SASL_PASSWORD", - "SASL_MECHANISM": "CONTEXTS_DEFAULT_SASL_MECHANISM", - "CLIENTID": "CONTEXTS_DEFAULT_CLIENTID", - "KAFKAVERSION": "CONTEXTS_DEFAULT_KAFKAVERSION", - "AVRO_SCHEMAREGISTRY": "CONTEXTS_DEFAULT_AVRO_SCHEMAREGISTRY", - "DEFAULTPARTITIONER": "CONTEXTS_DEFAULT_DEFAULTPARTITIONER", + "REQUESTTIMEOUT": "CONTEXTS_DEFAULT_REQUESTTIMEOUT", + "BROKERS": "CONTEXTS_DEFAULT_BROKERS", + "TLS_ENABLED": "CONTEXTS_DEFAULT_TLS_ENABLED", + "TLS_CA": "CONTEXTS_DEFAULT_TLS_CA", + "TLS_CERT": "CONTEXTS_DEFAULT_TLS_CERT", + "TLS_CERTKEY": "CONTEXTS_DEFAULT_TLS_CERTKEY", + "TLS_INSECURE": "CONTEXTS_DEFAULT_TLS_INSECURE", + "SASL_ENABLED": "CONTEXTS_DEFAULT_SASL_ENABLED", + "SASL_USERNAME": "CONTEXTS_DEFAULT_SASL_USERNAME", + "SASL_PASSWORD": "CONTEXTS_DEFAULT_SASL_PASSWORD", + "SASL_MECHANISM": "CONTEXTS_DEFAULT_SASL_MECHANISM", + "CLIENTID": "CONTEXTS_DEFAULT_CLIENTID", + "KAFKAVERSION": "CONTEXTS_DEFAULT_KAFKAVERSION", + "AVRO_SCHEMAREGISTRY": "CONTEXTS_DEFAULT_AVRO_SCHEMAREGISTRY", + "PROTOBUF_PROTOSETFILES": "CONTEXTS_DEFAULT_PROTOBUF_PROTOSETFILES", + "PROTOBUF_PROTOSETPATHS": "CONTEXTS_DEFAULT_PROTOBUF_PROTOSETPATHS", + "PROTOBUF_IMPORTPATHS": "CONTEXTS_DEFAULT_PROTOBUF_IMPORTPATHS", + "PROTOBUF_PROTOFILES": "CONTEXTS_DEFAULT_PROTOBUF_PROTOFILES", + "DEFAULTPARTITIONER": "CONTEXTS_DEFAULT_DEFAULTPARTITIONER", } var configPaths = []string{"$HOME/.config/kafkactl", "$HOME/.kafkactl", "$SNAP_REAL_HOME/.config/kafkactl", "$SNAP_DATA/kafkactl", "/etc/kafkactl"} diff --git a/cmd/root_test.go b/cmd/root_test.go index fdb20f3..bb70ac8 100644 --- a/cmd/root_test.go +++ b/cmd/root_test.go @@ -54,6 +54,10 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) { _ = os.Setenv("CLIENTID", "my-client") _ = os.Setenv("KAFKAVERSION", "2.0.1") _ = os.Setenv("AVRO_SCHEMAREGISTRY", "registry:8888") + _ = os.Setenv("PROTOBUF_PROTOSETFILES", "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset") + _ = os.Setenv("PROTOBUF_PROTOSETPATHS", "/usr/include/protosets /usr/lib/protosets") + _ = os.Setenv("PROTOBUF_IMPORTPATHS", "/usr/include/protobuf /usr/lib/protobuf") + _ = os.Setenv("PROTOBUF_PROTOFILES", "message.proto other.proto") _ = os.Setenv("DEFAULTPARTITIONER", "hash") kafkaCtl := testutil.CreateKafkaCtlCommand() @@ -81,5 +85,9 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) { testutil.AssertEquals(t, "my-client", viper.GetString("contexts.default.clientID")) testutil.AssertEquals(t, "2.0.1", viper.GetString("contexts.default.kafkaVersion")) testutil.AssertEquals(t, "registry:8888", viper.GetString("contexts.default.avro.schemaRegistry")) + testutil.AssertEquals(t, "/usr/include/protosets/ps1.protoset", viper.GetStringSlice("contexts.default.protobuf.protosetFiles")[0]) + testutil.AssertEquals(t, "/usr/include/protosets", viper.GetStringSlice("contexts.default.protobuf.protosetPaths")[0]) + testutil.AssertEquals(t, "/usr/include/protobuf", viper.GetStringSlice("contexts.default.protobuf.importPaths")[0]) + testutil.AssertEquals(t, "message.proto", viper.GetStringSlice("contexts.default.protobuf.protoFiles")[0]) testutil.AssertEquals(t, "hash", viper.GetString("contexts.default.defaultPartitioner")) } diff --git a/go.mod b/go.mod index 3300cc8..2dfc197 100644 --- a/go.mod +++ b/go.mod @@ -4,13 +4,13 @@ require ( github.com/Rican7/retry v0.1.0 github.com/Shopify/sarama v1.30.0 github.com/fsnotify/fsnotify v1.4.9 // indirect + github.com/golang/protobuf v1.4.2 github.com/google/go-cmp v0.5.5 // indirect + github.com/jhump/protoreflect v1.10.1 github.com/landoop/schema-registry v0.0.0-20190327143759-50a5701c1891 github.com/linkedin/goavro/v2 v2.10.0 github.com/magiconair/properties v1.8.4 // indirect github.com/mitchellh/mapstructure v1.3.3 // indirect - github.com/onsi/ginkgo v1.8.0 // indirect - github.com/onsi/gomega v1.4.3 // indirect github.com/pelletier/go-toml v1.8.1 // indirect github.com/pkg/errors v0.9.1 github.com/spf13/afero v1.4.0 // indirect @@ -24,12 +24,10 @@ require ( go.uber.org/atomic v1.5.1 // indirect go.uber.org/ratelimit v0.1.0 golang.org/x/tools v0.1.0 // indirect - gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect + google.golang.org/protobuf v1.25.1-0.20200805231151-a709e31e5d12 gopkg.in/errgo.v2 v2.1.0 - gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect gopkg.in/ini.v1 v1.61.0 // indirect gopkg.in/yaml.v2 v2.3.0 - gotest.tools/gotestsum v1.7.0 // indirect ) go 1.16 diff --git a/go.sum b/go.sum index c8c65b6..7c16fa2 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,7 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= @@ -48,17 +49,15 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= -github.com/dnephin/pflag v1.0.7 h1:oxONGlWxhmUct0YzKTgrpQv9AUA1wtPBn7zuSjJqptk= -github.com/dnephin/pflag v1.0.7/go.mod h1:uxE91IoWURlOiTUIA8Mq5ZZkAv3dPUfZNaT80Zm7OQE= github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= -github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= -github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= @@ -82,6 +81,14 @@ github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFU github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -89,7 +96,9 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -97,12 +106,11 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= -github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= -github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf/go.mod h1:cuNKsD1zp2v6XfE/orVX2QE1LC+i254ceGcVeDT3pTU= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= @@ -134,7 +142,6 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= -github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= @@ -149,9 +156,9 @@ github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJz github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/jhump/protoreflect v1.10.1 h1:iH+UZfsbRE6vpyZH7asAjTPWJf7RJbpZ9j/N3lDlKs0= +github.com/jhump/protoreflect v1.10.1/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8pneu5yW7Tg= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= -github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ= -github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= @@ -180,11 +187,7 @@ github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czP github.com/magiconair/properties v1.8.4 h1:8KGKTcQQGm0Kv7vEbKFErAoAOFyyacLStRtQSeYtvkY= github.com/magiconair/properties v1.8.4/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= -github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8= -github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= -github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= -github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= @@ -200,10 +203,8 @@ github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nishanths/predeclared v0.0.0-20200524104333-86fad755b4d3/go.mod h1:nt3d53pc1VYcphSCIaYAJtnPYnr3Zyn8fMq2wvPGPso= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= -github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.8.1 h1:1Nf83orprkJyknT6h7zbuEGUEjcyVlCxSUGTENmNCRM= @@ -222,6 +223,7 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= @@ -284,6 +286,8 @@ github.com/xdg/stringprep v1.0.3 h1:cmL5Enob4W83ti/ZHuZLuKD/xqJfus4fVPwE+/BDm+4= github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -325,11 +329,10 @@ golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= -golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -344,6 +347,8 @@ golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210917221730-978cfadd31cf h1:R150MpwJIv1MpS0N/pc+NhTM8ajzvlmxlY5OYsrevXQ= @@ -356,12 +361,12 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -374,15 +379,13 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -404,9 +407,9 @@ golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= -golang.org/x/tools v0.0.0-20190624222133-a101b041ded4/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -414,6 +417,9 @@ golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20200717024301-6ddee64345a6/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -439,11 +445,25 @@ google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= -gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.1-0.20200805231151-a709e31e5d12 h1:OwhZOOMuf7leLaSCuxtQ9FW7ui2L2L6UKOtKAUqovUQ= +google.golang.org/protobuf v1.25.1-0.20200805231151-a709e31e5d12/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -451,8 +471,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= -gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.61.0 h1:LBCdW4FmFYL4s/vDZD1RQYX7oAR6IjujCYgMdbHBR10= gopkg.in/ini.v1 v1.61.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= @@ -468,11 +486,10 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gotest.tools/gotestsum v1.7.0 h1:RwpqwwFKBAa2h+F6pMEGpE707Edld0etUD3GhqqhDNc= -gotest.tools/gotestsum v1.7.0/go.mod h1:V1m4Jw3eBerhI/A6qCxUE07RnCg7ACkKj9BYcAm09V8= -gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= +honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= diff --git a/internal/common-operation.go b/internal/common-operation.go index 64bb886..01bb849 100644 --- a/internal/common-operation.go +++ b/internal/common-operation.go @@ -12,6 +12,7 @@ import ( "github.com/Shopify/sarama" "github.com/deviceinsight/kafkactl/internal/helpers" + "github.com/deviceinsight/kafkactl/internal/helpers/protobuf" "github.com/deviceinsight/kafkactl/output" "github.com/pkg/errors" "github.com/spf13/viper" @@ -54,6 +55,7 @@ type ClientContext struct { ClientID string KafkaVersion sarama.KafkaVersion AvroSchemaRegistry string + Protobuf protobuf.SearchContext DefaultPartitioner string RequiredAcks string } @@ -95,6 +97,9 @@ func CreateClientContext() (ClientContext, error) { return context, err } context.AvroSchemaRegistry = viper.GetString("contexts." + context.Name + ".avro.schemaRegistry") + context.Protobuf.ProtosetFiles = viper.GetStringSlice("contexts." + context.Name + ".protobuf.protosetFiles") + context.Protobuf.ProtoImportPaths = viper.GetStringSlice("contexts." + context.Name + ".protobuf.importPaths") + context.Protobuf.ProtoFiles = viper.GetStringSlice("contexts." + context.Name + ".protobuf.protoFiles") context.DefaultPartitioner = viper.GetString("contexts." + context.Name + ".defaultPartitioner") context.RequiredAcks = viper.GetString("contexts." + context.Name + ".requiredAcks") context.Sasl.Enabled = viper.GetBool("contexts." + context.Name + ".sasl.enabled") diff --git a/internal/consumer/MessageDeserializerChain.go b/internal/consumer/MessageDeserializerChain.go new file mode 100644 index 0000000..6a1e7ae --- /dev/null +++ b/internal/consumer/MessageDeserializerChain.go @@ -0,0 +1,40 @@ +package consumer + +import ( + "github.com/Shopify/sarama" + "github.com/pkg/errors" +) + +type MessageDeserializerChain []MessageDeserializer + +func (deserializer MessageDeserializerChain) Deserialize(msg *sarama.ConsumerMessage, flags Flags) error { + for _, d := range deserializer { + canDeserialize, err := d.CanDeserialize(msg.Topic) + if err != nil { + return err + } + + if !canDeserialize { + continue + } + + return d.Deserialize(msg, flags) + } + + return errors.Errorf("can't find suitable deserializer") +} + +func (deserializer MessageDeserializerChain) CanDeserialize(topic string) (bool, error) { + for _, d := range deserializer { + canDeserialize, err := d.CanDeserialize(topic) + if err != nil { + return false, err + } + + if canDeserialize { + return true, nil + } + } + + return false, nil +} diff --git a/internal/consumer/ProtobufMessageDeserializer.go b/internal/consumer/ProtobufMessageDeserializer.go new file mode 100644 index 0000000..8974595 --- /dev/null +++ b/internal/consumer/ProtobufMessageDeserializer.go @@ -0,0 +1,149 @@ +package consumer + +import ( + "strings" + "time" + + "github.com/Shopify/sarama" + "github.com/deviceinsight/kafkactl/internal/helpers/protobuf" + "github.com/deviceinsight/kafkactl/output" + "github.com/jhump/protoreflect/desc" + "github.com/jhump/protoreflect/dynamic" + "github.com/pkg/errors" +) + +type ProtobufMessageDeserializer struct { + keyDescriptor *desc.MessageDescriptor + valueDescriptor *desc.MessageDescriptor +} + +func CreateProtobufMessageDeserializer(context protobuf.SearchContext, keyType, valueType string) (*ProtobufMessageDeserializer, error) { + valueDescriptor := protobuf.ResolveMessageType(context, valueType) + if valueDescriptor == nil { + return nil, errors.Errorf("value message type %q not found in provided files", valueType) + } + + keyDescriptor := protobuf.ResolveMessageType(context, keyType) + if valueDescriptor == nil && keyType != "" { + return nil, errors.Errorf("key message type %q not found in provided files", valueType) + } + + return &ProtobufMessageDeserializer{ + keyDescriptor: keyDescriptor, + valueDescriptor: valueDescriptor, + }, nil +} + +type protobufMessage struct { + Partition int32 + Offset int64 + Headers map[string]string `json:",omitempty" yaml:",omitempty"` + Key *string `json:",omitempty" yaml:",omitempty"` + Value *string + Timestamp *time.Time `json:",omitempty" yaml:",omitempty"` +} + +func (deserializer ProtobufMessageDeserializer) newProtobufMessage(msg *sarama.ConsumerMessage, flags Flags) (protobufMessage, error) { + var err error + + ret := protobufMessage{ + Partition: msg.Partition, + Offset: msg.Offset, + } + + if flags.PrintTimestamps && !msg.Timestamp.IsZero() { + ret.Timestamp = &msg.Timestamp + } + + if flags.PrintHeaders { + ret.Headers = encodeRecordHeaders(msg.Headers) + } + + ret.Value, err = decodeProtobuf(msg.Value, deserializer.valueDescriptor, flags.EncodeValue) + if err != nil { + return protobufMessage{}, errors.Wrap(err, "value decode failed") + } + + if flags.PrintKeys { + if deserializer.keyDescriptor != nil { + ret.Key, err = decodeProtobuf(msg.Key, deserializer.keyDescriptor, flags.EncodeKey) + if err != nil { + return protobufMessage{}, errors.Wrap(err, "key decode failed") + } + } else { + ret.Key = encodeBytes(msg.Key, flags.EncodeKey) + } + } + + return ret, nil +} + +func (deserializer ProtobufMessageDeserializer) Deserialize(rawMsg *sarama.ConsumerMessage, flags Flags) error { + output.Debugf("start to deserialize protobuf message...") + + msg, err := deserializer.newProtobufMessage(rawMsg, flags) + if err != nil { + return err + } + + if flags.OutputFormat != "" { + return output.PrintObject(msg, flags.OutputFormat) + } + + var row []string + + if flags.PrintHeaders { + if msg.Headers != nil { + column := toSortedArray(msg.Headers) + row = append(row, strings.Join(column[:], ",")) + } else { + row = append(row, "") + } + } + + if flags.PrintKeys { + if msg.Key != nil { + row = append(row, *msg.Key) + } else { + row = append(row, "") + } + } + if flags.PrintTimestamps { + if msg.Timestamp != nil { + row = append(row, (*msg.Timestamp).Format(time.RFC3339)) + } else { + row = append(row, "") + } + } + + var value string + + if msg.Value != nil { + value = *msg.Value + } else { + value = "null" + } + + row = append(row, value) + + output.PrintStrings(strings.Join(row[:], flags.Separator)) + return nil +} + +func (deserializer ProtobufMessageDeserializer) CanDeserialize(_ string) (bool, error) { + return true, nil +} + +func decodeProtobuf(b []byte, msgDesc *desc.MessageDescriptor, encoding string) (*string, error) { + msg := dynamic.NewMessage(msgDesc) + if err := msg.Unmarshal(b); err != nil { + return nil, err + } + + j, err := msg.MarshalJSON() + if err != nil { + return nil, err + } + + return encodeBytes(j, encoding), nil +} diff --git a/internal/consumer/consumer-operation.go b/internal/consumer/consumer-operation.go index c0a210b..344553e 100644 --- a/internal/consumer/consumer-operation.go +++ b/internal/consumer/consumer-operation.go @@ -17,19 +17,24 @@ import ( ) type Flags struct { - PrintKeys bool - PrintTimestamps bool - PrintAvroSchema bool - PrintHeaders bool - OutputFormat string - Separator string - Partitions []int - Offsets []string - FromBeginning bool - Tail int - Exit bool - EncodeValue string - EncodeKey string + PrintKeys bool + PrintTimestamps bool + PrintAvroSchema bool + PrintHeaders bool + OutputFormat string + Separator string + Partitions []int + Offsets []string + FromBeginning bool + Tail int + Exit bool + EncodeValue string + EncodeKey string + ProtoFiles []string + ProtoImportPaths []string + ProtosetFiles []string + KeyProtoType string + ValueProtoType string } type ConsumedMessage struct { @@ -73,28 +78,33 @@ func (operation *Operation) Consume(topic string, flags Flags) error { return errors.Wrap(err, "Failed to start consumer: ") } - var deserializer MessageDeserializer + var deserializers MessageDeserializerChain if clientContext.AvroSchemaRegistry != "" { - deserializer, err = CreateAvroMessageDeserializer(topic, clientContext.AvroSchemaRegistry) + deserializer, err := CreateAvroMessageDeserializer(topic, clientContext.AvroSchemaRegistry) if err != nil { return err } - if canDeserialize, err := deserializer.CanDeserialize(topic); err != nil { + + deserializers = append(deserializers, deserializer) + } + + if flags.ValueProtoType != "" { + context := clientContext.Protobuf + context.ProtosetFiles = append(flags.ProtosetFiles, context.ProtosetFiles...) + context.ProtoFiles = append(flags.ProtoFiles, context.ProtoFiles...) + context.ProtoImportPaths = append(flags.ProtoImportPaths, context.ProtoImportPaths...) + + deserializer, err := CreateProtobufMessageDeserializer(context, flags.KeyProtoType, flags.ValueProtoType) + if err != nil { return err - } else if !canDeserialize { - output.Debugf("no avro topic") - deserializer = nil - } else { - output.Debugf("using AvroMessageDeserializer") } - } - if deserializer == nil { - output.Debugf("using DefaultMessageDeserializer") - deserializer = DefaultMessageDeserializer{} + deserializers = append(deserializers, deserializer) } + deserializers = append(deserializers, DefaultMessageDeserializer{}) + var partitions []int32 if flags.Partitions == nil || len(flags.Partitions) == 0 { @@ -200,7 +210,7 @@ func (operation *Operation) Consume(topic string, flags Flags) error { } lastIndex := len(sortedMessages) - 1 for i := range sortedMessages { - err := deserializer.Deserialize(sortedMessages[lastIndex-i], flags) + err := deserializers.Deserialize(sortedMessages[lastIndex-i], flags) if err != nil { recordFirstError(errChannel, err) return @@ -213,7 +223,7 @@ func (operation *Operation) Consume(topic string, flags Flags) error { go func() { defer wgPendingMessages.Done() for msg := range messages { - err := deserializer.Deserialize(msg, flags) + err := deserializers.Deserialize(msg, flags) if err != nil { if recordFirstError(errChannel, err) { close(closing) diff --git a/internal/helpers/protobuf/protobuf.go b/internal/helpers/protobuf/protobuf.go new file mode 100644 index 0000000..0d92652 --- /dev/null +++ b/internal/helpers/protobuf/protobuf.go @@ -0,0 +1,89 @@ +package protobuf + +import ( + "io/ioutil" + + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/descriptorpb" + + "github.com/deviceinsight/kafkactl/output" + "github.com/jhump/protoreflect/desc" + "github.com/jhump/protoreflect/desc/protoparse" +) + +type SearchContext struct { + ProtosetFiles []string + ProtoFiles []string + ProtoImportPaths []string +} + +func ResolveMessageType(context SearchContext, typeName string) *desc.MessageDescriptor { + for _, descriptor := range makeDescriptors(context) { + if msg := descriptor.FindMessage(typeName); msg != nil { + return msg + } + } + + return nil +} + +func makeDescriptors(context SearchContext) []*desc.FileDescriptor { + var ret []*desc.FileDescriptor + + ret = appendProtosets(ret, context.ProtosetFiles) + + absFiles, err := protoparse.ResolveFilenames(context.ProtoImportPaths, context.ProtoFiles...) + if err != nil { + output.Warnf("Resolve proto files failed: %s", err) + return ret + } + + protoFiles, err := (&protoparse.Parser{ + ImportPaths: append([]string{"."}, context.ProtoImportPaths...), + InferImportPaths: true, + ErrorReporter: func(err protoparse.ErrorWithPos) error { + output.Warnf("Proto parser error [%s]: %s", err.GetPosition(), err) + return nil + }, + WarningReporter: func(pos protoparse.ErrorWithPos) { + output.Warnf("Proto parse warning: %s", err) + }, + }).ParseFiles(absFiles...) + if err != nil { + output.Warnf("Proto files parse error: %s", err) + } + + ret = append(ret, protoFiles...) + + return ret +} + +func appendProtosets(descs []*desc.FileDescriptor, protosetFiles []string) []*desc.FileDescriptor { + for _, protosetFile := range protosetFiles { + var files descriptorpb.FileDescriptorSet + + b, err := ioutil.ReadFile(protosetFile) + if err != nil { + output.Warnf("Read protoset file %s failed: %s", protosetFile, err) + continue + } + + if err = proto.Unmarshal(b, &files); err != nil { + output.Warnf("Parse protoset file %s failed: %s", protosetFile, err) + continue + } + + fds, err := desc.CreateFileDescriptorsFromSet(&files) + if err != nil { + output.Warnf("Convert file %s to descriptors failed: %s", protosetFile, err) + continue + } + + for _, fd := range fds { + descs = append(descs, fd) + } + + } + + return descs +} diff --git a/internal/producer/MessageSerializerChain.go b/internal/producer/MessageSerializerChain.go new file mode 100644 index 0000000..7e8ab0d --- /dev/null +++ b/internal/producer/MessageSerializerChain.go @@ -0,0 +1,47 @@ +package producer + +import ( + "github.com/Shopify/sarama" + "github.com/pkg/errors" +) + +type MessageSerializerChain struct { + topic string + serializers []MessageSerializer +} + +func (serializer MessageSerializerChain) Serialize(key, value []byte, flags Flags) (*sarama.ProducerMessage, error) { + for _, s := range serializer.serializers { + canSerialize, err := s.CanSerialize(serializer.topic) + if err != nil { + return nil, err + } + + if !canSerialize { + continue + } + + return s.Serialize(key, value, flags) + } + + return nil, errors.Errorf("can't find suitable serializer") +} + +func (serializer MessageSerializerChain) CanSerialize(topic string) (bool, error) { + if serializer.topic != topic { + return false, nil + } + + for _, s := range serializer.serializers { + canSerialize, err := s.CanSerialize(serializer.topic) + if err != nil { + return false, err + } + + if canSerialize { + return true, nil + } + } + + return false, nil +} diff --git a/internal/producer/ProtobufMessageSerializer.go b/internal/producer/ProtobufMessageSerializer.go new file mode 100644 index 0000000..ee679d4 --- /dev/null +++ b/internal/producer/ProtobufMessageSerializer.go @@ -0,0 +1,84 @@ +package producer + +import ( + "github.com/Shopify/sarama" + "github.com/deviceinsight/kafkactl/internal/helpers/protobuf" + "github.com/golang/protobuf/jsonpb" + "github.com/jhump/protoreflect/desc" + "github.com/jhump/protoreflect/dynamic" + "github.com/pkg/errors" +) + +type ProtobufMessageSerializer struct { + topic string + keyDescriptor *desc.MessageDescriptor + valueDescriptor *desc.MessageDescriptor +} + +func CreateProtobufMessageSerializer(topic string, context protobuf.SearchContext, keyType, valueType string) (*ProtobufMessageSerializer, error) { + valueDescriptor := protobuf.ResolveMessageType(context, valueType) + if valueDescriptor == nil { + return nil, errors.Errorf("value message type %q not found in provided files", valueType) + } + + keyDescriptor := protobuf.ResolveMessageType(context, keyType) + if valueDescriptor == nil && keyType != "" { + return nil, errors.Errorf("key message type %q not found in provided files", valueType) + } + + return &ProtobufMessageSerializer{ + topic: topic, + keyDescriptor: keyDescriptor, + valueDescriptor: valueDescriptor, + }, nil +} + +func (serializer ProtobufMessageSerializer) CanSerialize(string) (bool, error) { + return true, nil +} + +func (serializer ProtobufMessageSerializer) Serialize(key, value []byte, flags Flags) (*sarama.ProducerMessage, error) { + recordHeaders, err := createRecordHeaders(flags) + if err != nil { + return nil, err + } + + message := &sarama.ProducerMessage{Topic: serializer.topic, Partition: flags.Partition, Headers: recordHeaders} + + if key != nil { + message.Key, err = encodeProtobuf(key, serializer.keyDescriptor, flags.KeyEncoding) + if err != nil { + return nil, err + } + } + + message.Value, err = encodeProtobuf(value, serializer.valueDescriptor, flags.ValueEncoding) + if err != nil { + return nil, err + } + + return message, nil +} + +func encodeProtobuf(data []byte, messageDescriptor *desc.MessageDescriptor, encoding string) (sarama.ByteEncoder, error) { + data, err := decodeBytes(data, encoding) + if err != nil { + return nil, err + } + + if messageDescriptor == nil { + return data, nil + } + + message := dynamic.NewMessage(messageDescriptor) + if err = message.UnmarshalJSONPB(&jsonpb.Unmarshaler{AllowUnknownFields: true}, data); err != nil { + return nil, errors.Wrap(err, "invalid json") + } + + pb, err := message.Marshal() + if err != nil { + return nil, err + } + + return pb, nil +} diff --git a/internal/producer/producer-operation.go b/internal/producer/producer-operation.go index 8d2b2dc..8d1152c 100644 --- a/internal/producer/producer-operation.go +++ b/internal/producer/producer-operation.go @@ -33,6 +33,11 @@ type Flags struct { ValueEncoding string Silent bool RateInSeconds int + ProtoFiles []string + ProtoImportPaths []string + ProtosetFiles []string + KeyProtoType string + ValueProtoType string } type Operation struct { @@ -83,28 +88,33 @@ func (operation *Operation) Produce(topic string, flags Flags) error { return errors.New("separator is used to split input from stdin/file. it cannot be used together with key or value") } - var serializer MessageSerializer + serializers := MessageSerializerChain{topic: topic} if clientContext.AvroSchemaRegistry != "" { - serializer, err = CreateAvroMessageSerializer(topic, clientContext.AvroSchemaRegistry) + serializer, err := CreateAvroMessageSerializer(topic, clientContext.AvroSchemaRegistry) if err != nil { return err } - if canSerialize, err := serializer.CanSerialize(topic); err != nil { + + serializers.serializers = append(serializers.serializers, serializer) + } + + if flags.ValueProtoType != "" { + context := clientContext.Protobuf + context.ProtosetFiles = append(flags.ProtosetFiles, context.ProtosetFiles...) + context.ProtoFiles = append(flags.ProtoFiles, context.ProtoFiles...) + context.ProtoImportPaths = append(flags.ProtoImportPaths, context.ProtoImportPaths...) + + serializer, err := CreateProtobufMessageSerializer(topic, context, flags.KeyProtoType, flags.ValueProtoType) + if err != nil { return err - } else if !canSerialize { - output.Debugf("no avro topic") - serializer = nil - } else { - output.Debugf("using AvroMessageSerializer") } - } - if serializer == nil { - output.Debugf("using DefaultMessageSerializer") - serializer = DefaultMessageSerializer{topic: topic} + serializers.serializers = append(serializers.serializers, serializer) } + serializers.serializers = append(serializers.serializers, DefaultMessageSerializer{topic: topic}) + output.Debugf("producer config: %+v", config.Producer) producer, err := sarama.NewSyncProducer(clientContext.Brokers, config) if err != nil { @@ -134,9 +144,9 @@ func (operation *Operation) Produce(topic string, flags Flags) error { var message *sarama.ProducerMessage if flags.NullValue { - message, err = serializer.Serialize([]byte(key), nil, flags) + message, err = serializers.Serialize([]byte(key), nil, flags) } else { - message, err = serializer.Serialize([]byte(key), []byte(flags.Value), flags) + message, err = serializers.Serialize([]byte(key), []byte(flags.Value), flags) } if err != nil { @@ -228,7 +238,7 @@ func (operation *Operation) Produce(topic string, flags Flags) error { } messageCount++ - message, err := serializer.Serialize([]byte(key), []byte(value), flags) + message, err := serializers.Serialize([]byte(key), []byte(value), flags) if err != nil { return errors.Wrap(err, "Failed to produce message") } diff --git a/testutil/test_util.go b/testutil/test_util.go index 11228eb..fd6f32c 100644 --- a/testutil/test_util.go +++ b/testutil/test_util.go @@ -21,6 +21,8 @@ import ( "github.com/spf13/cobra" ) +var RootDir string + var random = rand.New(rand.NewSource(time.Now().UnixNano())) var configFile = "it-config.yml" @@ -58,6 +60,8 @@ func init() { panic(err) } + RootDir = rootDir + if err := os.Setenv("KAFKA_CTL_CONFIG", filepath.Join(rootDir, configFile)); err != nil { panic(err) } diff --git a/testutil/testdata/msg.proto b/testutil/testdata/msg.proto new file mode 100644 index 0000000..03f028b --- /dev/null +++ b/testutil/testdata/msg.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +// generate protoset file with 'protoc -o msg.protoset --include_imports msg.proto' + +message TopicMessage { + google.protobuf.Timestamp produced_at = 1; + int64 num = 2; +} + +message TopicKey { + float fvalue = 1; +} \ No newline at end of file diff --git a/testutil/testdata/msg.protoset b/testutil/testdata/msg.protoset new file mode 100644 index 0000000..f71d730 --- /dev/null +++ b/testutil/testdata/msg.protoset @@ -0,0 +1,15 @@ + +ÿ +google/protobuf/timestamp.protogoogle.protobuf"; + Timestamp +seconds (Rseconds +nanos (RnanosB… +com.google.protobufBTimestampProtoPZ2google.golang.org/protobuf/types/known/timestamppbø¢GPBªGoogle.Protobuf.WellKnownTypesbproto3 +· + msg.protogoogle/protobuf/timestamp.proto"] + TopicMessage; + produced_at ( 2.google.protobuf.TimestampR +producedAt +num (Rnum"" +TopicKey +fvalue (Rfvaluebproto3 \ No newline at end of file