Skip to content

Commit

Permalink
backend: minor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
bojand committed Aug 30, 2023
1 parent 9aa43d1 commit dde5877
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 56 deletions.
4 changes: 2 additions & 2 deletions backend/pkg/api/stream_progress_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ func (p *streamProgressReporter) OnMessage(message *kafka.TopicMessage) {
OriginalPayload: message.Key.OriginalPayload,
PayloadSize: int32(message.Key.PayloadSizeBytes),
NormalizedPayload: message.Key.NormalizedPayload,
IsPayloadTooLarge: false, // TODO check for size
IsPayloadTooLarge: message.Key.IsPayloadTooLarge,
Encoding: toProtoEncoding(message.Key.Encoding),
},
Value: &v1alpha.KafkaRecordPayload{
OriginalPayload: message.Value.OriginalPayload,
PayloadSize: int32(message.Value.PayloadSizeBytes),
NormalizedPayload: message.Value.NormalizedPayload,
IsPayloadTooLarge: false, // TODO check for size
IsPayloadTooLarge: message.Value.IsPayloadTooLarge,
Encoding: toProtoEncoding(message.Value.Encoding),
},
}
Expand Down
54 changes: 0 additions & 54 deletions backend/pkg/serde/protobuf_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,60 +455,6 @@ func TestProtobufSchemaSerde_SerializeObject(t *testing.T) {
assert.Equal(t, expectData, actualData)
})

t.Run("valid with index", func(t *testing.T) {
// TODO TEST THIS IN INTEGRATION TEST WITH ACTUAL SCHEMA

// var testSerde sr.Serde
// testSerde.Register(
// 2000,
// &indexv1.Gadget_Gizmo{},
// sr.EncodeFn(func(v any) ([]byte, error) {
// return proto.Marshal(v.(*indexv1.Gadget_Gizmo))
// }),
// sr.DecodeFn(func(b []byte, v any) error {
// return proto.Unmarshal(b, v.(*indexv1.Gadget_Gizmo))
// }),
// sr.Index(2, 0),
// )

// msg := indexv1.Gadget{
// Identity: "gadget_0",
// Gizmo: &indexv1.Gadget_Gizmo{
// Size: 10,
// Item: &indexv1.Item{
// ItemType: indexv1.Item_ITEM_TYPE_PERSONAL,
// Name: "item_0",
// },
// },
// Widgets: []*indexv1.Widget{
// {
// Id: "wid_0",
// },
// {
// Id: "wid_1",
// },
// },
// }

// expectData, err := testSerde.Encode(msg.GetGizmo())
// require.NoError(t, err)

// data := map[string]interface{}{
// "size": 10,
// "item": map[string]interface{}{
// "itemType": 1,
// "name": "item_0",
// },
// }

// serde := ProtobufSchemaSerde{ProtoSvc: testProtoSvc}

// actualData, err := serde.SerializeObject(data, PayloadTypeValue, WithSchemaID(2000), WithIndex(2, 0))
// assert.NoError(t, err)

// assert.Equal(t, expectData, actualData)
})

t.Run("invalid schema id", func(t *testing.T) {
data := map[string]interface{}{
"id": "333",
Expand Down

0 comments on commit dde5877

Please sign in to comment.