Skip to content

Commit

Permalink
Merge pull request #879 from redpanda-data/backend/common_proto_types
Browse files Browse the repository at this point in the history
Backend: Add Common protobuf types
  • Loading branch information
bojand authored Oct 16, 2023
2 parents c7f788e + 12bd001 commit 97227b9
Show file tree
Hide file tree
Showing 29 changed files with 1,922 additions and 4 deletions.
2 changes: 1 addition & 1 deletion backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (
golang.org/x/net v0.17.0
golang.org/x/sync v0.3.0
golang.org/x/text v0.13.0
google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97
google.golang.org/genproto/googleapis/api v0.0.0-20230920204549-e6e6cdab5c13
google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c
google.golang.org/grpc v1.58.3
Expand Down Expand Up @@ -154,7 +155,6 @@ require (
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/tools v0.13.0 // indirect
google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
176 changes: 174 additions & 2 deletions backend/pkg/kafka/deserializer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,19 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sr"
"go.uber.org/zap"
"google.golang.org/genproto/googleapis/type/color"
"google.golang.org/genproto/googleapis/type/dayofweek"
"google.golang.org/genproto/googleapis/type/decimal"
"google.golang.org/genproto/googleapis/type/fraction"
"google.golang.org/genproto/googleapis/type/latlng"
"google.golang.org/genproto/googleapis/type/money"
"google.golang.org/genproto/googleapis/type/month"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/redpanda-data/console/backend/pkg/config"
"github.com/redpanda-data/console/backend/pkg/kafka/testdata/proto/gen/common"
indexv1 "github.com/redpanda-data/console/backend/pkg/kafka/testdata/proto/gen/index/v1"
shopv1 "github.com/redpanda-data/console/backend/pkg/kafka/testdata/proto/gen/shop/v1"
shopv2 "github.com/redpanda-data/console/backend/pkg/kafka/testdata/proto/gen/shop/v2"
Expand Down Expand Up @@ -574,6 +582,7 @@ func (s *KafkaIntegrationTestSuite) TestDeserializeRecord() {
}

msgData, err := serde.Encode(&msg)
require.NoError(err)

r := &kgo.Record{
Key: []byte(msg.Id),
Expand All @@ -582,7 +591,7 @@ func (s *KafkaIntegrationTestSuite) TestDeserializeRecord() {
Timestamp: orderCreatedAt,
}

produceCtx, produceCancel := context.WithTimeout(context.Background(), 3*time.Second)
produceCtx, produceCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer produceCancel()

results := s.kafkaClient.ProduceSync(produceCtx, r)
Expand Down Expand Up @@ -634,6 +643,166 @@ func (s *KafkaIntegrationTestSuite) TestDeserializeRecord() {
assert.Equal(timestamppb.New(orderCreatedAt).GetSeconds(), rOrder.GetCreatedAt().GetSeconds())
})

t.Run("schema registry protobuf common", func(t *testing.T) {
// create the topic
testTopicName := testutil.TopicNameForTest("deserializer_schema_protobuf_common")
_, err := s.kafkaAdminClient.CreateTopic(ctx, 1, 1, nil, testTopicName)
require.NoError(err)

defer func() {
_, err := s.kafkaAdminClient.DeleteTopics(ctx, testTopicName)
assert.NoError(err)
}()

registryURL := "http://" + s.registryAddress

// register the protobuf schema
rcl, err := sr.NewClient(sr.URLs(registryURL))
require.NoError(err)

protoFile, err := os.ReadFile("testdata/proto/common/common.proto")
require.NoError(err)

ss, err := rcl.CreateSchema(context.Background(), testTopicName+"-value", sr.Schema{
Schema: string(protoFile),
Type: sr.TypeProtobuf,
})
require.NoError(err)
require.NotNil(ss)

// test
cfg := s.createBaseConfig()

metricName := testutil.MetricNameForTest(strings.ReplaceAll("deserializer", " ", ""))

svc, err := NewService(&cfg, s.log, metricName)
require.NoError(err)

svc.Start()

// Set up Serde
var serde sr.Serde
serde.Register(
ss.ID,
&common.CommonMessage{},
sr.EncodeFn(func(v any) ([]byte, error) {
return proto.Marshal(v.(*common.CommonMessage))
}),
sr.DecodeFn(func(b []byte, v any) error {
return proto.Unmarshal(b, v.(*common.CommonMessage))
}),
sr.Index(0),
)

messageCreatedAt := time.Date(2023, time.July, 11, 13, 0, 0, 0, time.UTC)
msg := common.CommonMessage{
Id: "345",
DecVal: &decimal.Decimal{
Value: "-1.50",
},
Color: &color.Color{
Red: 0.1,
Green: 0.2,
Blue: 0.3,
},
Dow: dayofweek.DayOfWeek_FRIDAY,
Fraction: &fraction.Fraction{
Numerator: 10,
Denominator: 20,
},
Latlng: &latlng.LatLng{
Latitude: 45.45,
Longitude: 12.34,
},
Price: &money.Money{
CurrencyCode: "USD",
Units: 100,
},
Month: month.Month_JANUARY,
}

msgData, err := serde.Encode(&msg)
require.NoError(err)

r := &kgo.Record{
Key: []byte(msg.Id),
Value: msgData,
Topic: testTopicName,
Timestamp: messageCreatedAt,
}

produceCtx, produceCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer produceCancel()

results := s.kafkaClient.ProduceSync(produceCtx, r)
require.NoError(results.FirstErr())

consumeCtx, consumeCancel := context.WithTimeout(context.Background(), 1*time.Second)
defer consumeCancel()

cl := s.consumerClientForTopic(testTopicName)

var record *kgo.Record

for {
fetches := cl.PollFetches(consumeCtx)
errs := fetches.Errors()
if fetches.IsClientClosed() ||
(len(errs) == 1 && (errors.Is(errs[0].Err, context.DeadlineExceeded) || errors.Is(errs[0].Err, context.Canceled))) {
break
}

require.Empty(errs)

iter := fetches.RecordIter()

for !iter.Done() && record == nil {
record = iter.Next()
break
}
}

require.NotEmpty(record)

dr := svc.Deserializer.DeserializeRecord(record)
require.NotNil(dr)
assert.Equal(messageEncodingProtobuf, dr.Value.Payload.RecognizedEncoding)
assert.IsType(map[string]interface{}{}, dr.Value.Object)

cm := common.CommonMessage{}
err = protojson.Unmarshal(dr.Value.Payload.Payload, &cm)
require.NoError(err)
assert.Equal("345", cm.Id)
assert.Equal("-1.50", cm.GetDecVal().GetValue())
assert.Equal(float32(0.1), cm.GetColor().GetRed())
assert.Equal(float32(0.2), cm.GetColor().GetGreen())
assert.Equal(float32(0.3), cm.GetColor().GetBlue())
assert.Equal(int64(10), cm.GetFraction().GetNumerator())
assert.Equal(int64(20), cm.GetFraction().GetDenominator())
assert.Equal(45.45, cm.GetLatlng().GetLatitude())
assert.Equal(12.34, cm.GetLatlng().GetLongitude())
assert.Equal("USD", cm.GetPrice().GetCurrencyCode())
assert.Equal(int64(100), cm.GetPrice().GetUnits())
assert.Equal("JANUARY", cm.GetMonth().String())

// franz-go serde
cm = common.CommonMessage{}
err = serde.Decode(record.Value, &cm)
require.NoError(err)
assert.Equal("345", cm.Id)
assert.Equal("-1.50", cm.GetDecVal().GetValue())
assert.Equal(float32(0.1), cm.GetColor().GetRed())
assert.Equal(float32(0.2), cm.GetColor().GetGreen())
assert.Equal(float32(0.3), cm.GetColor().GetBlue())
assert.Equal(int64(10), cm.GetFraction().GetNumerator())
assert.Equal(int64(20), cm.GetFraction().GetDenominator())
assert.Equal(45.45, cm.GetLatlng().GetLatitude())
assert.Equal(12.34, cm.GetLatlng().GetLongitude())
assert.Equal("USD", cm.GetPrice().GetCurrencyCode())
assert.Equal(int64(100), cm.GetPrice().GetUnits())
assert.Equal("JANUARY", cm.GetMonth().String())
})

t.Run("schema registry protobuf multi", func(t *testing.T) {
// create the topic
testTopicName := testutil.TopicNameForTest("deserializer_schema_protobuf_multi")
Expand Down Expand Up @@ -705,6 +874,7 @@ func (s *KafkaIntegrationTestSuite) TestDeserializeRecord() {
}

msgData, err := serde.Encode(&msg)
require.NoError(err)

r := &kgo.Record{
Key: []byte(msg.GetIdentity()),
Expand Down Expand Up @@ -857,7 +1027,7 @@ func (s *KafkaIntegrationTestSuite) TestDeserializeRecord() {
Topic: testTopicName,
}

produceCtx, produceCancel := context.WithTimeout(context.Background(), 3*time.Second)
produceCtx, produceCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer produceCancel()

results := s.kafkaClient.ProduceSync(produceCtx, r)
Expand Down Expand Up @@ -1071,6 +1241,7 @@ func (s *KafkaIntegrationTestSuite) TestDeserializeRecord() {
}

msgData, err := serde.Encode(&msg)
require.NoError(err)

r := &kgo.Record{
Key: []byte(msg.Id),
Expand Down Expand Up @@ -1239,6 +1410,7 @@ func (s *KafkaIntegrationTestSuite) TestDeserializeRecord() {
}

msgData, err := serde.Encode(&msg)
require.NoError(err)

r := &kgo.Record{
Key: []byte(msg.Id),
Expand Down
8 changes: 8 additions & 0 deletions backend/pkg/kafka/testdata/proto/buf.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Generated by buf. DO NOT EDIT.
version: v1
deps:
- remote: buf.build
owner: googleapis
repository: googleapis
commit: 28151c0d0a1641bf938a7672c500e01d
digest: shake256:49215edf8ef57f7863004539deff8834cfb2195113f0b890dd1f67815d9353e28e668019165b9d872395871eeafcbab3ccfdb2b5f11734d3cca95be9e8d139de
2 changes: 2 additions & 0 deletions backend/pkg/kafka/testdata/proto/buf.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
version: v1
deps:
- buf.build/googleapis/googleapis
breaking:
use:
- FILE
Expand Down
35 changes: 35 additions & 0 deletions backend/pkg/kafka/testdata/proto/common/common.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2022 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file https://github.com/redpanda-data/redpanda/blob/dev/licenses/bsl.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

syntax = "proto3";

package common;

import "google/type/decimal.proto";
import "google/type/color.proto";
import "google/type/dayofweek.proto";
import "google/type/fraction.proto";
import "google/type/latlng.proto";
import "google/type/money.proto";
import "google/type/month.proto";
import "google/type/phone_number.proto";

option go_package = "github.com/redpanda-data/console/backend/pkg/kafka/testdata/proto/gen/common";

message CommonMessage {
string id = 1;
google.type.Decimal dec_val = 2;
google.type.Color color = 3;
google.type.DayOfWeek dow = 4;
google.type.Fraction fraction = 5;
google.type.LatLng latlng = 6;
google.type.Money price = 7;
google.type.Month month = 8;
google.type.PhoneNumber phone_number = 9;
}
Loading

0 comments on commit 97227b9

Please sign in to comment.