Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backend: Add Common protobuf types #879

Merged
merged 3 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (
golang.org/x/net v0.17.0
golang.org/x/sync v0.3.0
golang.org/x/text v0.13.0
google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97
google.golang.org/genproto/googleapis/api v0.0.0-20230920204549-e6e6cdab5c13
google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c
google.golang.org/grpc v1.58.3
Expand Down Expand Up @@ -154,7 +155,6 @@ require (
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/tools v0.13.0 // indirect
google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
176 changes: 174 additions & 2 deletions backend/pkg/kafka/deserializer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,19 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sr"
"go.uber.org/zap"
"google.golang.org/genproto/googleapis/type/color"
"google.golang.org/genproto/googleapis/type/dayofweek"
"google.golang.org/genproto/googleapis/type/decimal"
"google.golang.org/genproto/googleapis/type/fraction"
"google.golang.org/genproto/googleapis/type/latlng"
"google.golang.org/genproto/googleapis/type/money"
"google.golang.org/genproto/googleapis/type/month"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/redpanda-data/console/backend/pkg/config"
"github.com/redpanda-data/console/backend/pkg/kafka/testdata/proto/gen/common"
indexv1 "github.com/redpanda-data/console/backend/pkg/kafka/testdata/proto/gen/index/v1"
shopv1 "github.com/redpanda-data/console/backend/pkg/kafka/testdata/proto/gen/shop/v1"
shopv2 "github.com/redpanda-data/console/backend/pkg/kafka/testdata/proto/gen/shop/v2"
Expand Down Expand Up @@ -574,6 +582,7 @@ func (s *KafkaIntegrationTestSuite) TestDeserializeRecord() {
}

msgData, err := serde.Encode(&msg)
require.NoError(err)

r := &kgo.Record{
Key: []byte(msg.Id),
Expand All @@ -582,7 +591,7 @@ func (s *KafkaIntegrationTestSuite) TestDeserializeRecord() {
Timestamp: orderCreatedAt,
}

produceCtx, produceCancel := context.WithTimeout(context.Background(), 3*time.Second)
produceCtx, produceCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer produceCancel()

results := s.kafkaClient.ProduceSync(produceCtx, r)
Expand Down Expand Up @@ -634,6 +643,166 @@ func (s *KafkaIntegrationTestSuite) TestDeserializeRecord() {
assert.Equal(timestamppb.New(orderCreatedAt).GetSeconds(), rOrder.GetCreatedAt().GetSeconds())
})

t.Run("schema registry protobuf common", func(t *testing.T) {
// create the topic
testTopicName := testutil.TopicNameForTest("deserializer_schema_protobuf_common")
_, err := s.kafkaAdminClient.CreateTopic(ctx, 1, 1, nil, testTopicName)
require.NoError(err)

defer func() {
_, err := s.kafkaAdminClient.DeleteTopics(ctx, testTopicName)
assert.NoError(err)
}()

registryURL := "http://" + s.registryAddress

// register the protobuf schema
rcl, err := sr.NewClient(sr.URLs(registryURL))
require.NoError(err)

protoFile, err := os.ReadFile("testdata/proto/common/common.proto")
require.NoError(err)

ss, err := rcl.CreateSchema(context.Background(), testTopicName+"-value", sr.Schema{
Schema: string(protoFile),
Type: sr.TypeProtobuf,
})
require.NoError(err)
require.NotNil(ss)

// test
cfg := s.createBaseConfig()

metricName := testutil.MetricNameForTest(strings.ReplaceAll("deserializer", " ", ""))

svc, err := NewService(&cfg, s.log, metricName)
require.NoError(err)

svc.Start()

// Set up Serde
var serde sr.Serde
serde.Register(
ss.ID,
&common.CommonMessage{},
sr.EncodeFn(func(v any) ([]byte, error) {
return proto.Marshal(v.(*common.CommonMessage))
}),
sr.DecodeFn(func(b []byte, v any) error {
return proto.Unmarshal(b, v.(*common.CommonMessage))
}),
sr.Index(0),
)

messageCreatedAt := time.Date(2023, time.July, 11, 13, 0, 0, 0, time.UTC)
msg := common.CommonMessage{
Id: "345",
DecVal: &decimal.Decimal{
Value: "-1.50",
},
Color: &color.Color{
Red: 0.1,
Green: 0.2,
Blue: 0.3,
},
Dow: dayofweek.DayOfWeek_FRIDAY,
Fraction: &fraction.Fraction{
Numerator: 10,
Denominator: 20,
},
Latlng: &latlng.LatLng{
Latitude: 45.45,
Longitude: 12.34,
},
Price: &money.Money{
CurrencyCode: "USD",
Units: 100,
},
Month: month.Month_JANUARY,
}

msgData, err := serde.Encode(&msg)
require.NoError(err)

r := &kgo.Record{
Key: []byte(msg.Id),
Value: msgData,
Topic: testTopicName,
Timestamp: messageCreatedAt,
}

produceCtx, produceCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer produceCancel()

results := s.kafkaClient.ProduceSync(produceCtx, r)
require.NoError(results.FirstErr())

consumeCtx, consumeCancel := context.WithTimeout(context.Background(), 1*time.Second)
defer consumeCancel()

cl := s.consumerClientForTopic(testTopicName)

var record *kgo.Record

for {
fetches := cl.PollFetches(consumeCtx)
errs := fetches.Errors()
if fetches.IsClientClosed() ||
(len(errs) == 1 && (errors.Is(errs[0].Err, context.DeadlineExceeded) || errors.Is(errs[0].Err, context.Canceled))) {
break
}

require.Empty(errs)

iter := fetches.RecordIter()

for !iter.Done() && record == nil {
record = iter.Next()
break
}
}

require.NotEmpty(record)

dr := svc.Deserializer.DeserializeRecord(record)
require.NotNil(dr)
assert.Equal(messageEncodingProtobuf, dr.Value.Payload.RecognizedEncoding)
assert.IsType(map[string]interface{}{}, dr.Value.Object)

cm := common.CommonMessage{}
err = protojson.Unmarshal(dr.Value.Payload.Payload, &cm)
require.NoError(err)
assert.Equal("345", cm.Id)
assert.Equal("-1.50", cm.GetDecVal().GetValue())
assert.Equal(float32(0.1), cm.GetColor().GetRed())
assert.Equal(float32(0.2), cm.GetColor().GetGreen())
assert.Equal(float32(0.3), cm.GetColor().GetBlue())
assert.Equal(int64(10), cm.GetFraction().GetNumerator())
assert.Equal(int64(20), cm.GetFraction().GetDenominator())
assert.Equal(45.45, cm.GetLatlng().GetLatitude())
assert.Equal(12.34, cm.GetLatlng().GetLongitude())
assert.Equal("USD", cm.GetPrice().GetCurrencyCode())
assert.Equal(int64(100), cm.GetPrice().GetUnits())
assert.Equal("JANUARY", cm.GetMonth().String())

// franz-go serde
cm = common.CommonMessage{}
err = serde.Decode(record.Value, &cm)
require.NoError(err)
assert.Equal("345", cm.Id)
assert.Equal("-1.50", cm.GetDecVal().GetValue())
assert.Equal(float32(0.1), cm.GetColor().GetRed())
assert.Equal(float32(0.2), cm.GetColor().GetGreen())
assert.Equal(float32(0.3), cm.GetColor().GetBlue())
assert.Equal(int64(10), cm.GetFraction().GetNumerator())
assert.Equal(int64(20), cm.GetFraction().GetDenominator())
assert.Equal(45.45, cm.GetLatlng().GetLatitude())
assert.Equal(12.34, cm.GetLatlng().GetLongitude())
assert.Equal("USD", cm.GetPrice().GetCurrencyCode())
assert.Equal(int64(100), cm.GetPrice().GetUnits())
assert.Equal("JANUARY", cm.GetMonth().String())
})

t.Run("schema registry protobuf multi", func(t *testing.T) {
// create the topic
testTopicName := testutil.TopicNameForTest("deserializer_schema_protobuf_multi")
Expand Down Expand Up @@ -705,6 +874,7 @@ func (s *KafkaIntegrationTestSuite) TestDeserializeRecord() {
}

msgData, err := serde.Encode(&msg)
require.NoError(err)

r := &kgo.Record{
Key: []byte(msg.GetIdentity()),
Expand Down Expand Up @@ -857,7 +1027,7 @@ func (s *KafkaIntegrationTestSuite) TestDeserializeRecord() {
Topic: testTopicName,
}

produceCtx, produceCancel := context.WithTimeout(context.Background(), 3*time.Second)
produceCtx, produceCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer produceCancel()

results := s.kafkaClient.ProduceSync(produceCtx, r)
Expand Down Expand Up @@ -1071,6 +1241,7 @@ func (s *KafkaIntegrationTestSuite) TestDeserializeRecord() {
}

msgData, err := serde.Encode(&msg)
require.NoError(err)

r := &kgo.Record{
Key: []byte(msg.Id),
Expand Down Expand Up @@ -1239,6 +1410,7 @@ func (s *KafkaIntegrationTestSuite) TestDeserializeRecord() {
}

msgData, err := serde.Encode(&msg)
require.NoError(err)

r := &kgo.Record{
Key: []byte(msg.Id),
Expand Down
8 changes: 8 additions & 0 deletions backend/pkg/kafka/testdata/proto/buf.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Generated by buf. DO NOT EDIT.
version: v1
deps:
- remote: buf.build
owner: googleapis
repository: googleapis
commit: 28151c0d0a1641bf938a7672c500e01d
digest: shake256:49215edf8ef57f7863004539deff8834cfb2195113f0b890dd1f67815d9353e28e668019165b9d872395871eeafcbab3ccfdb2b5f11734d3cca95be9e8d139de
2 changes: 2 additions & 0 deletions backend/pkg/kafka/testdata/proto/buf.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
version: v1
deps:
- buf.build/googleapis/googleapis
breaking:
use:
- FILE
Expand Down
35 changes: 35 additions & 0 deletions backend/pkg/kafka/testdata/proto/common/common.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2022 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file https://github.com/redpanda-data/redpanda/blob/dev/licenses/bsl.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

syntax = "proto3";

package common;

import "google/type/decimal.proto";
import "google/type/color.proto";
import "google/type/dayofweek.proto";
import "google/type/fraction.proto";
import "google/type/latlng.proto";
import "google/type/money.proto";
import "google/type/month.proto";
import "google/type/phone_number.proto";

option go_package = "github.com/redpanda-data/console/backend/pkg/kafka/testdata/proto/gen/common";

message CommonMessage {
string id = 1;
google.type.Decimal dec_val = 2;
google.type.Color color = 3;
google.type.DayOfWeek dow = 4;
google.type.Fraction fraction = 5;
google.type.LatLng latlng = 6;
google.type.Money price = 7;
google.type.Month month = 8;
google.type.PhoneNumber phone_number = 9;
}
Loading
Loading