diff --git a/backend/pkg/api/stream_progress_reporter.go b/backend/pkg/api/stream_progress_reporter.go index 8c1f21f17..e99b8c5be 100644 --- a/backend/pkg/api/stream_progress_reporter.go +++ b/backend/pkg/api/stream_progress_reporter.go @@ -125,14 +125,14 @@ func (p *streamProgressReporter) OnMessage(message *kafka.TopicMessage) { OriginalPayload: message.Key.OriginalPayload, PayloadSize: int32(message.Key.PayloadSizeBytes), NormalizedPayload: message.Key.NormalizedPayload, - IsPayloadTooLarge: false, // TODO check for size + IsPayloadTooLarge: message.Key.IsPayloadTooLarge, Encoding: toProtoEncoding(message.Key.Encoding), }, Value: &v1alpha.KafkaRecordPayload{ OriginalPayload: message.Value.OriginalPayload, PayloadSize: int32(message.Value.PayloadSizeBytes), NormalizedPayload: message.Value.NormalizedPayload, - IsPayloadTooLarge: false, // TODO check for size + IsPayloadTooLarge: message.Value.IsPayloadTooLarge, Encoding: toProtoEncoding(message.Value.Encoding), }, } diff --git a/backend/pkg/serde/protobuf_schema_test.go b/backend/pkg/serde/protobuf_schema_test.go index 02a20279a..f252fb2be 100644 --- a/backend/pkg/serde/protobuf_schema_test.go +++ b/backend/pkg/serde/protobuf_schema_test.go @@ -455,60 +455,6 @@ func TestProtobufSchemaSerde_SerializeObject(t *testing.T) { assert.Equal(t, expectData, actualData) }) - t.Run("valid with index", func(t *testing.T) { - // TODO TEST THIS IN INTEGRATION TEST WITH ACTUAL SCHEMA - - // var testSerde sr.Serde - // testSerde.Register( - // 2000, - // &indexv1.Gadget_Gizmo{}, - // sr.EncodeFn(func(v any) ([]byte, error) { - // return proto.Marshal(v.(*indexv1.Gadget_Gizmo)) - // }), - // sr.DecodeFn(func(b []byte, v any) error { - // return proto.Unmarshal(b, v.(*indexv1.Gadget_Gizmo)) - // }), - // sr.Index(2, 0), - // ) - - // msg := indexv1.Gadget{ - // Identity: "gadget_0", - // Gizmo: &indexv1.Gadget_Gizmo{ - // Size: 10, - // Item: &indexv1.Item{ - // ItemType: indexv1.Item_ITEM_TYPE_PERSONAL, - // Name: "item_0", - // }, - // }, - // Widgets: []*indexv1.Widget{ - // { - // Id: "wid_0", - // }, - // { - // Id: "wid_1", - // }, - // }, - // } - - // expectData, err := testSerde.Encode(msg.GetGizmo()) - // require.NoError(t, err) - - // data := map[string]interface{}{ - // "size": 10, - // "item": map[string]interface{}{ - // "itemType": 1, - // "name": "item_0", - // }, - // } - - // serde := ProtobufSchemaSerde{ProtoSvc: testProtoSvc} - - // actualData, err := serde.SerializeObject(data, PayloadTypeValue, WithSchemaID(2000), WithIndex(2, 0)) - // assert.NoError(t, err) - - // assert.Equal(t, expectData, actualData) - }) - t.Run("invalid schema id", func(t *testing.T) { data := map[string]interface{}{ "id": "333",