From 569e38ac9386d619a1d5727ff3a1a04e82b1163e Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Fri, 7 Feb 2025 00:51:11 +0300 Subject: [PATCH 1/2] Added log grpc messages metadata on trace log level for topic writer --- CHANGELOG.md | 2 + internal/grpcwrapper/rawtopic/client.go | 13 ++- .../rawtopic/rawtopicwriter/streamwriter.go | 12 ++ internal/topic/topicclientinternal/client.go | 4 +- .../topicwriterinternal/writer_reconnector.go | 4 +- .../writer_reconnector_test.go | 26 +++-- log/topic.go | 70 +++++++++++ tests/integration/topic_read_writer_test.go | 56 +++++++++ trace/details.go | 25 ++-- trace/topic.go | 23 ++++ trace/topic_gtrace.go | 109 +++++++++++++++++- 11 files changed, 313 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a9526ac97..8759dabaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added log grpc messages metadata on trace log level for topic writer + ## v3.99.5 * Fixed error `Empty query text` using prepared statements and `ydb.WithExecuteDataQueryOverQueryClient(true)` option * Prepared statements always send query text on Execute call from now (previous behaviour - send query ID) diff --git a/internal/grpcwrapper/rawtopic/client.go b/internal/grpcwrapper/rawtopic/client.go index 77fa4001e..530986dfa 100644 --- a/internal/grpcwrapper/rawtopic/client.go +++ b/internal/grpcwrapper/rawtopic/client.go @@ -3,6 +3,8 @@ package rawtopic import ( "context" "fmt" + "github.com/google/uuid" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" "github.com/ydb-platform/ydb-go-genproto/Ydb_Topic_V1" @@ -99,7 +101,10 @@ func (c *Client) StreamRead(ctxStreamLifeTime context.Context) (rawtopicreader.S return rawtopicreader.StreamReader{Stream: protoResp}, nil } -func (c *Client) StreamWrite(ctxStreamLifeTime context.Context) (*rawtopicwriter.StreamWriter, error) { +func (c *Client) StreamWrite( + ctxStreamLifeTime context.Context, + tracer *trace.Topic, +) (*rawtopicwriter.StreamWriter, error) { protoResp, err := c.service.StreamWrite(ctxStreamLifeTime) if err != nil { return nil, xerrors.WithStackTrace( @@ -109,7 +114,11 @@ func (c *Client) StreamWrite(ctxStreamLifeTime context.Context) (*rawtopicwriter ) } - return &rawtopicwriter.StreamWriter{Stream: protoResp}, nil + return &rawtopicwriter.StreamWriter{ + Stream: protoResp, + Tracer: tracer, + InternalStreamID: uuid.New().String(), + }, nil } func (c *Client) UpdateOffsetsInTransaction( diff --git a/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go b/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go index 8ff0b9727..9ab144dd5 100644 --- a/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go +++ b/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go @@ -3,6 +3,7 @@ package rawtopicwriter import ( "errors" "fmt" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" "reflect" "sync" "sync/atomic" @@ -29,6 +30,12 @@ type StreamWriter struct { sendCloseMtx sync.Mutex Stream GrpcStream + + Tracer *trace.Topic + InternalStreamID string + readMessagesCount int + writtenMessagesCount int + sessionID string } func (w *StreamWriter) Recv() (ServerMessage, error) { @@ -40,6 +47,8 @@ func (w *StreamWriter) Recv() (ServerMessage, error) { } grpcMsg, err := w.Stream.Recv() + w.readMessagesCount++ + trace.TopicOnWriterReceiveGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, err) if err != nil { if !xerrors.IsErrorFromServer(err) { err = xerrors.Transport(err) @@ -64,6 +73,7 @@ func (w *StreamWriter) Recv() (ServerMessage, error) { var res InitResult res.ServerMessageMetadata = meta res.mustFromProto(v.InitResponse) + w.sessionID = res.SessionID return &res, nil case *Ydb_Topic.StreamWriteMessage_FromServer_WriteResponse: @@ -124,6 +134,8 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) { } err = w.Stream.Send(&protoMsg) + w.writtenMessagesCount++ + trace.TopicOnWriterSentGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.writtenMessagesCount, &protoMsg, err) if err != nil { return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed to send grpc message to writer stream: %w", err))) } diff --git a/internal/topic/topicclientinternal/client.go b/internal/topic/topicclientinternal/client.go index 60050c26e..cb4672c25 100644 --- a/internal/topic/topicclientinternal/client.go +++ b/internal/topic/topicclientinternal/client.go @@ -356,11 +356,11 @@ func (c *Client) createWriterConfig( topicPath string, opts []topicoptions.WriterOption, ) topicwriterinternal.WriterReconnectorConfig { - var connector topicwriterinternal.ConnectFunc = func(ctx context.Context) ( + var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) ( topicwriterinternal.RawTopicWriterStream, error, ) { - return c.rawClient.StreamWrite(ctx) + return c.rawClient.StreamWrite(ctx, tracer) } options := []topicoptions.WriterOption{ diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index 2491e5a0c..217652cd7 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -517,7 +517,7 @@ func (w *WriterReconnector) connectWithTimeout(streamLifetimeContext context.Con } }() - stream, err := w.cfg.Connect(connectCtx) + stream, err := w.cfg.Connect(connectCtx, w.cfg.Tracer) resCh <- resT{stream: stream, err: err} }() @@ -789,7 +789,7 @@ func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, multiEncoder *Multi return res } -type ConnectFunc func(ctx context.Context) (RawTopicWriterStream, error) +type ConnectFunc func(ctx context.Context, tracer *trace.Topic) (RawTopicWriterStream, error) func createPublicCodecsFromRaw(codecs rawtopiccommon.SupportedCodecs) []topictypes.Codec { res := make([]topictypes.Codec, len(codecs)) diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index 405265399..7bcf4a091 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -6,6 +6,7 @@ import ( "context" "errors" "fmt" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" "io" "math" "sort" @@ -446,7 +447,7 @@ func TestWriterImpl_Reconnect(t *testing.T) { connectCalled := false connectCalledChan := make(empty.Chan) - w.cfg.Connect = func(streamCtxArg context.Context) (RawTopicWriterStream, error) { + w.cfg.Connect = func(streamCtxArg context.Context, _ *trace.Topic) (RawTopicWriterStream, error) { close(connectCalledChan) connectCalled = true require.NotEqual(t, ctx, streamCtxArg) @@ -562,7 +563,7 @@ func TestWriterImpl_Reconnect(t *testing.T) { } var connectionAttempt atomic.Int64 - w.cfg.Connect = func(ctx context.Context) (RawTopicWriterStream, error) { + w.cfg.Connect = func(ctx context.Context, _ *trace.Topic) (RawTopicWriterStream, error) { attemptIndex := int(connectionAttempt.Add(1)) - 1 t.Logf("connect with attempt index: %v", attemptIndex) res := connectsResult[attemptIndex] @@ -1078,17 +1079,18 @@ func newTestEnv(t testing.TB, options *testEnvOptions) *testEnv { partitionID: 14, } - writerOptions := append(defaultTestWriterOptions(), WithConnectFunc(func(ctx context.Context) ( - RawTopicWriterStream, - error, - ) { - connectNum := atomic.AddInt64(&res.connectCount, 1) - if connectNum > 1 { - t.Fatalf("test: default env support most one connection") - } + writerOptions := append(defaultTestWriterOptions(), WithConnectFunc( + func(ctx context.Context, _ *trace.Topic) ( + RawTopicWriterStream, + error, + ) { + connectNum := atomic.AddInt64(&res.connectCount, 1) + if connectNum > 1 { + t.Fatalf("test: default env support most one connection") + } - return res.stream, nil - })) + return res.stream, nil + })) writerOptions = append(writerOptions, options.writerOptions...) res.writer = newWriterReconnectorStopped(NewWriterReconnectorConfig(writerOptions...)) diff --git a/log/topic.go b/log/topic.go index 2d1b8164d..664d6391b 100644 --- a/log/topic.go +++ b/log/topic.go @@ -2,6 +2,9 @@ package log import ( "context" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" "time" "github.com/ydb-platform/ydb-go-sdk/v3/internal/kv" @@ -885,6 +888,39 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { kv.Version(), ) } + + t.OnWriterSentGRPCMessage = func(info trace.TopicWriterSentGRPCMessageInfo) { + if d.Details()&trace.TopicWriterStreamGrpcMessageEvents == 0 { + return + } + + ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "grpc") + l.Log( + ctx, "topic writer sent grpc message (message body and metadata are removed)", + kv.String("topic_stream_internal_id", info.TopicStreamInternalID), + kv.String("session_id", info.SessionID), + kv.Int("message_number", info.MessageNumber), + kv.Stringer("message", lazyProtoStringifer{info.Message}), + kv.Error(info.Error), + kv.Version(), + ) + } + t.OnWriterReceiveGRPCMessage = func(info trace.TopicWriterReceiveGRPCMessageInfo) { + if d.Details()&trace.TopicWriterStreamGrpcMessageEvents == 0 { + return + } + + ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "grpc") + l.Log( + ctx, "topic writer received grpc message (message body and metadata are removed)", + kv.String("topic_stream_internal_id", info.TopicStreamInternalID), + kv.String("session_id", info.SessionID), + kv.Int("message_number", info.MessageNumber), + kv.Stringer("message", lazyProtoStringifer{info.Message}), + kv.Error(info.Error), + kv.Version(), + ) + } t.OnWriterReadUnknownGrpcMessage = func(info trace.TopicOnWriterReadUnknownGrpcMessageInfo) { if d.Details()&trace.TopicWriterStreamEvents == 0 { return @@ -899,3 +935,37 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { return t } + +type lazyProtoStringifer struct { + message proto.Message +} + +func (s lazyProtoStringifer) String() string { + // cut message data + if writeRequest, ok := s.message.(*Ydb_Topic.StreamWriteMessage_FromClient); ok { + if data := writeRequest.GetWriteRequest(); data != nil { + type messDataType struct { + Data []byte + Metadata []*Ydb_Topic.MetadataItem + } + storage := make([]messDataType, len(data.Messages)) + for i := range data.Messages { + storage[i].Data = data.Messages[i].Data + data.Messages[i] = nil + + storage[i].Metadata = data.Messages[i].MetadataItems + data.Messages[i].MetadataItems = nil + } + + defer func() { + for i := range data.Messages { + data.Messages[i].Data = storage[i].Data + data.Messages[i].MetadataItems = storage[i].Metadata + } + }() + } + } + + res := protojson.MarshalOptions{AllowPartial: true}.Format(s.message) + return res +} diff --git a/tests/integration/topic_read_writer_test.go b/tests/integration/topic_read_writer_test.go index 19f440b4b..b10e6c938 100644 --- a/tests/integration/topic_read_writer_test.go +++ b/tests/integration/topic_read_writer_test.go @@ -8,6 +8,7 @@ import ( "context" "encoding/binary" "errors" + "github.com/ydb-platform/ydb-go-sdk/v3/log" "io" "os" "runtime/pprof" @@ -33,6 +34,61 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) +func TestTopicWriterLogMessagesWithoutData(t *testing.T) { + scope := newScope(t) + + producerID := "dkeujsl" + const seqNoInt = 486812497 + seqNoString := "486812497" + data := "kdjwkruowe" + metaKey := "gyoeexiufo" + metaValue := "fjedeikeosbv" + + logs := &strings.Builder{} + writer, err := scope.Driver().Topic().StartWriter( + scope.TopicPath(), + topicoptions.WithWriterProducerID(producerID), + topicoptions.WithWriterSetAutoSeqNo(false), + topicoptions.WithWriterWaitServerAck(true), + topicoptions.WithWriterTrace(log.Topic( + log.Default(logs, log.WithMinLevel(log.TRACE)), trace.TopicWriterStreamGrpcMessageEvents), + ), + ) + + scope.Require.NoError(err) + err = writer.Write(scope.Ctx, + topicwriter.Message{ + SeqNo: seqNoInt, + Data: strings.NewReader(data), + Metadata: map[string][]byte{ + metaKey: []byte(metaValue), + }, + }, + ) + scope.Require.NoError(err) + + err = writer.Close(scope.Ctx) + scope.Require.NoError(err) + + logsString := logs.String() + scope.Require.Contains(logsString, producerID) + scope.Require.Contains(logsString, seqNoString) + scope.Require.NotContains(logsString, metaKey) + scope.Require.NotContains(logsString, metaValue) + scope.Require.NotContains(logsString, data) + + mess, err := scope.TopicReader().ReadMessage(scope.Ctx) + scope.Require.NoError(err) + + scope.Require.Equal(producerID, mess.ProducerID) + scope.Require.Equal(int64(seqNoInt), mess.SeqNo) + scope.Require.Equal(metaValue, string(mess.Metadata[metaKey])) + + messData, err := io.ReadAll(mess) + scope.Require.NoError(err) + scope.Require.Equal(data, string(messData)) +} + func TestSendAsyncMessages(t *testing.T) { ctx := context.Background() db := connect(t) diff --git a/trace/details.go b/trace/details.go index 4b335b748..1d7fdf560 100644 --- a/trace/details.go +++ b/trace/details.go @@ -64,6 +64,7 @@ const ( TopicWriterStreamLifeCycleEvents TopicWriterStreamEvents + TopicWriterStreamGrpcMessageEvents DatabaseSQLConnectorEvents DatabaseSQLConnEvents @@ -116,7 +117,10 @@ const ( TopicReaderPartitionEvents | TopicReaderStreamLifeCycleEvents - TopicEvents = TopicControlPlaneEvents | TopicReaderEvents + TopicWriterEvents = TopicWriterStreamLifeCycleEvents | TopicWriterStreamEvents | + TopicWriterStreamGrpcMessageEvents + + TopicEvents = TopicControlPlaneEvents | TopicReaderEvents | TopicWriterEvents DatabaseSQLEvents = DatabaseSQLConnectorEvents | DatabaseSQLConnEvents | @@ -169,15 +173,16 @@ var ( DatabaseSQLTxEvents: "ydb.database.sql.tx", DatabaseSQLStmtEvents: "ydb.database.sql.stmt", - TopicEvents: "ydb.topic", - TopicControlPlaneEvents: "ydb.topic.controlplane", - TopicReaderEvents: "ydb.topic.reader", - TopicReaderStreamEvents: "ydb.topic.reader.stream", - TopicReaderMessageEvents: "ydb.topic.reader.message", - TopicReaderPartitionEvents: "ydb.topic.reader.partition", - TopicReaderStreamLifeCycleEvents: "ydb.topic.reader.lifecycle", - TopicWriterStreamLifeCycleEvents: "ydb.topic.writer.lifecycle", - TopicWriterStreamEvents: "ydb.topic.writer.stream", + TopicEvents: "ydb.topic", + TopicControlPlaneEvents: "ydb.topic.controlplane", + TopicReaderEvents: "ydb.topic.reader", + TopicReaderStreamEvents: "ydb.topic.reader.stream", + TopicReaderMessageEvents: "ydb.topic.reader.message", + TopicReaderPartitionEvents: "ydb.topic.reader.partition", + TopicReaderStreamLifeCycleEvents: "ydb.topic.reader.lifecycle", + TopicWriterStreamLifeCycleEvents: "ydb.topic.writer.lifecycle", + TopicWriterStreamEvents: "ydb.topic.writer.stream", + TopicWriterStreamGrpcMessageEvents: "ydb.topic.writer.grpc", } defaultDetails = DetailsAll ) diff --git a/trace/topic.go b/trace/topic.go index 7d68906e7..4819d4c18 100644 --- a/trace/topic.go +++ b/trace/topic.go @@ -2,6 +2,7 @@ package trace import ( "context" + "google.golang.org/protobuf/proto" ) // tool gtrace used from ./internal/cmd/gtrace @@ -133,6 +134,10 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnWriterReceiveResult func(TopicWriterResultMessagesInfo) // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + OnWriterSentGRPCMessage func(TopicWriterSentGRPCMessageInfo) + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + OnWriterReceiveGRPCMessage func(TopicWriterReceiveGRPCMessageInfo) + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnWriterReadUnknownGrpcMessage func(TopicOnWriterReadUnknownGrpcMessageInfo) } @@ -534,6 +539,24 @@ type ( CloseError error } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + TopicWriterSentGRPCMessageInfo struct { + TopicStreamInternalID string + SessionID string + MessageNumber int + Message proto.Message + Error error + } + + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + TopicWriterReceiveGRPCMessageInfo struct { + TopicStreamInternalID string + SessionID string + MessageNumber int + Message proto.Message + Error error + } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicOnWriterReadUnknownGrpcMessageInfo struct { WriterInstanceID string diff --git a/trace/topic_gtrace.go b/trace/topic_gtrace.go index 5733d21a1..f9ab97a71 100644 --- a/trace/topic_gtrace.go +++ b/trace/topic_gtrace.go @@ -4,6 +4,8 @@ package trace import ( "context" + + "google.golang.org/protobuf/reflect/protoreflect" ) // topicComposeOptions is a holder of options @@ -26,9 +28,6 @@ func WithTopicPanicCallback(cb func(e interface{})) TopicComposeOption { // Compose returns a new Topic which has functional fields composed both from t and x. // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func (t *Topic) Compose(x *Topic, opts ...TopicComposeOption) *Topic { - if t == nil { - return x - } var ret Topic options := topicComposeOptions{} for _, opt := range opts { @@ -971,6 +970,44 @@ func (t *Topic) Compose(x *Topic, opts ...TopicComposeOption) *Topic { } } } + { + h1 := t.OnWriterSentGRPCMessage + h2 := x.OnWriterSentGRPCMessage + ret.OnWriterSentGRPCMessage = func(t TopicWriterSentGRPCMessageInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(t) + } + if h2 != nil { + h2(t) + } + } + } + { + h1 := t.OnWriterReceiveGRPCMessage + h2 := x.OnWriterReceiveGRPCMessage + ret.OnWriterReceiveGRPCMessage = func(t TopicWriterReceiveGRPCMessageInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(t) + } + if h2 != nil { + h2(t) + } + } + } { h1 := t.OnWriterReadUnknownGrpcMessage h2 := x.OnWriterReadUnknownGrpcMessage @@ -1395,6 +1432,20 @@ func (t *Topic) onWriterReceiveResult(t1 TopicWriterResultMessagesInfo) { } fn(t1) } +func (t *Topic) onWriterSentGRPCMessage(t1 TopicWriterSentGRPCMessageInfo) { + fn := t.OnWriterSentGRPCMessage + if fn == nil { + return + } + fn(t1) +} +func (t *Topic) onWriterReceiveGRPCMessage(t1 TopicWriterReceiveGRPCMessageInfo) { + fn := t.OnWriterReceiveGRPCMessage + if fn == nil { + return + } + fn(t1) +} func (t *Topic) onWriterReadUnknownGrpcMessage(t1 TopicOnWriterReadUnknownGrpcMessageInfo) { fn := t.OnWriterReadUnknownGrpcMessage if fn == nil { @@ -1402,6 +1453,7 @@ func (t *Topic) onWriterReadUnknownGrpcMessage(t1 TopicOnWriterReadUnknownGrpcMe } fn(t1) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderStart(t *Topic, readerID int64, consumer string, e error) { var p TopicReaderStartInfo @@ -1410,6 +1462,7 @@ func TopicOnReaderStart(t *Topic, readerID int64, consumer string, e error) { p.Error = e t.onReaderStart(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReconnect(t *Topic, reason error) func(error) { var p TopicReaderReconnectStartInfo @@ -1421,6 +1474,7 @@ func TopicOnReaderReconnect(t *Topic, reason error) func(error) { res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReconnectRequest(t *Topic, reason error, wasSent bool) { var p TopicReaderReconnectRequestInfo @@ -1428,6 +1482,7 @@ func TopicOnReaderReconnectRequest(t *Topic, reason error, wasSent bool) { p.WasSent = wasSent t.onReaderReconnectRequest(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string, partitionContext *context.Context, topic string, partitionID int64, partitionSessionID int64) func(readOffset *int64, commitOffset *int64, _ error) { var p TopicReaderPartitionReadStartResponseStartInfo @@ -1445,6 +1500,7 @@ func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, partitionContext context.Context, topic string, partitionID int64, partitionSessionID int64, committedOffset int64, graceful bool) func(error) { var p TopicReaderPartitionReadStopResponseStartInfo @@ -1462,6 +1518,7 @@ func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string, partitionID int64, partitionSessionID int64, startOffset int64, endOffset int64) func(error) { var p TopicReaderCommitStartInfo @@ -1478,6 +1535,7 @@ func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo) func(error) { var p TopicReaderSendCommitMessageStartInfo @@ -1489,6 +1547,7 @@ func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendC res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic string, partitionID int64, partitionSessionID int64, committedOffset int64) { var p TopicReaderCommittedNotifyInfo @@ -1499,6 +1558,7 @@ func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic str p.CommittedOffset = committedOffset t.onReaderCommittedNotify(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) func(closeError error) { var p TopicReaderCloseStartInfo @@ -1511,6 +1571,7 @@ func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestInfo TopicReadStreamInitRequestInfo) func(readerConnectionID string, _ error) { var p TopicReaderInitStartInfo @@ -1524,6 +1585,7 @@ func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestIn res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderError(t *Topic, readerConnectionID string, e error) { var p TopicReaderErrorInfo @@ -1531,6 +1593,7 @@ func TopicOnReaderError(t *Topic, readerConnectionID string, e error) { p.Error = e t.onReaderError(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderUpdateToken(t *Topic, readerConnectionID string) func(tokenLen int, _ error) func(error) { var p OnReadUpdateTokenStartInfo @@ -1548,6 +1611,7 @@ func TopicOnReaderUpdateToken(t *Topic, readerConnectionID string) func(tokenLen } } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderPopBatchTx(t *Topic, c *context.Context, readerID int64, transactionSessionID string, tx txInfo) func(startOffset int64, endOffset int64, messagesCount int, _ error) { var p TopicReaderPopBatchTxStartInfo @@ -1565,6 +1629,7 @@ func TopicOnReaderPopBatchTx(t *Topic, c *context.Context, readerID int64, trans res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderStreamPopBatchTx(t *Topic, c *context.Context, readerID int64, readerConnectionID string, transactionSessionID string, tx txInfo) func(error) { var p TopicReaderStreamPopBatchTxStartInfo @@ -1580,6 +1645,7 @@ func TopicOnReaderStreamPopBatchTx(t *Topic, c *context.Context, readerID int64, res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderUpdateOffsetsInTransaction(t *Topic, c *context.Context, readerID int64, readerConnectionID string, transactionSessionID string, tx txInfo) func(error) { var p TopicReaderOnUpdateOffsetsInTransactionStartInfo @@ -1595,6 +1661,7 @@ func TopicOnReaderUpdateOffsetsInTransaction(t *Topic, c *context.Context, reade res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderTransactionCompleted(t *Topic, c *context.Context, readerID int64, readerConnectionID string, transactionSessionID string, tx txInfo, transactionResult error) func() { var p TopicReaderTransactionCompletedStartInfo @@ -1610,6 +1677,7 @@ func TopicOnReaderTransactionCompleted(t *Topic, c *context.Context, readerID in res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderTransactionRollback(t *Topic, c *context.Context, readerID int64, readerConnectionID string, transactionSessionID string, tx txInfo) func(rollbackError error) { var p TopicReaderTransactionRollbackStartInfo @@ -1625,6 +1693,7 @@ func TopicOnReaderTransactionRollback(t *Topic, c *context.Context, readerID int res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderSentDataRequest(t *Topic, readerConnectionID string, requestBytes int, localBufferSizeAfterSent int) { var p TopicReaderSentDataRequestInfo @@ -1633,6 +1702,7 @@ func TopicOnReaderSentDataRequest(t *Topic, readerConnectionID string, requestBy p.LocalBufferSizeAfterSent = localBufferSizeAfterSent t.onReaderSentDataRequest(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, localBufferSizeAfterReceive int, dataResponse TopicReaderDataResponseInfo) func(error) { var p TopicReaderReceiveDataResponseStartInfo @@ -1646,6 +1716,7 @@ func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, local res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCount int, maxCount int, freeBufferCapacity int) func(messagesCount int, topic string, partitionID int64, partitionSessionID int64, offsetStart int64, offsetEnd int64, freeBufferCapacity int, _ error) { var p TopicReaderReadMessagesStartInfo @@ -1667,6 +1738,7 @@ func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCou res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderUnknownGrpcMessage(t *Topic, readerConnectionID string, e error) { var p OnReadUnknownGrpcMessageInfo @@ -1674,6 +1746,7 @@ func TopicOnReaderUnknownGrpcMessage(t *Topic, readerConnectionID string, e erro p.Error = e t.onReaderUnknownGrpcMessage(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, producerID string, attempt int) func(connectionResult error) func(error) { var p TopicWriterReconnectStartInfo @@ -1693,6 +1766,7 @@ func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, pro } } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, producerID string) func(sessionID string, _ error) { var p TopicWriterInitStreamStartInfo @@ -1707,6 +1781,7 @@ func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, pr res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(error) { var p TopicWriterCloseStartInfo @@ -1719,6 +1794,7 @@ func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(er res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterBeforeCommitTransaction(t *Topic, ctx *context.Context, kqpSessionID string, topicSessionID string, transactionID string) func(_ error, topicSessionID string) { var p TopicOnWriterBeforeCommitTransactionStartInfo @@ -1734,6 +1810,7 @@ func TopicOnWriterBeforeCommitTransaction(t *Topic, ctx *context.Context, kqpSes res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterAfterFinishTransaction(t *Topic, e error, sessionID string, transactionID string) func(closeError error) { var p TopicOnWriterAfterFinishTransactionStartInfo @@ -1747,6 +1824,7 @@ func TopicOnWriterAfterFinishTransaction(t *Topic, e error, sessionID string, tr res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int, reason TopicWriterCompressMessagesReason) func(error) { var p TopicWriterCompressMessagesStartInfo @@ -1763,6 +1841,7 @@ func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int) func(error) { var p TopicWriterSendMessagesStartInfo @@ -1778,6 +1857,7 @@ func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID stri res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterReceiveResult(t *Topic, writerInstanceID string, sessionID string, partitionID int64, acks TopicWriterResultMessagesInfoAcks) { var p TopicWriterResultMessagesInfo @@ -1787,6 +1867,29 @@ func TopicOnWriterReceiveResult(t *Topic, writerInstanceID string, sessionID str p.Acks = acks t.onWriterReceiveResult(p) } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals +func TopicOnWriterSentGRPCMessage(t *Topic, topicStreamInternalID string, sessionID string, messageNumber int, message protoreflect.ProtoMessage, e error) { + var p TopicWriterSentGRPCMessageInfo + p.TopicStreamInternalID = topicStreamInternalID + p.SessionID = sessionID + p.MessageNumber = messageNumber + p.Message = message + p.Error = e + t.onWriterSentGRPCMessage(p) +} + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals +func TopicOnWriterReceiveGRPCMessage(t *Topic, topicStreamInternalID string, sessionID string, messageNumber int, message protoreflect.ProtoMessage, e error) { + var p TopicWriterReceiveGRPCMessageInfo + p.TopicStreamInternalID = topicStreamInternalID + p.SessionID = sessionID + p.MessageNumber = messageNumber + p.Message = message + p.Error = e + t.onWriterReceiveGRPCMessage(p) +} + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterReadUnknownGrpcMessage(t *Topic, writerInstanceID string, sessionID string, e error) { var p TopicOnWriterReadUnknownGrpcMessageInfo From c66c90a03d40cf845fe59fd2f34f3f94d0e31add Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Fri, 7 Feb 2025 14:36:05 +0300 Subject: [PATCH 2/2] small fixes --- internal/grpcwrapper/rawtopic/client.go | 4 +- .../rawtopic/rawtopicwriter/streamwriter.go | 24 +++++++---- .../writer_reconnector_test.go | 2 +- log/topic.go | 14 ++++--- tests/integration/topic_read_writer_test.go | 2 +- trace/topic.go | 7 ++-- trace/topic_gtrace.go | 41 +++---------------- 7 files changed, 37 insertions(+), 57 deletions(-) diff --git a/internal/grpcwrapper/rawtopic/client.go b/internal/grpcwrapper/rawtopic/client.go index 530986dfa..203622e85 100644 --- a/internal/grpcwrapper/rawtopic/client.go +++ b/internal/grpcwrapper/rawtopic/client.go @@ -3,15 +3,15 @@ package rawtopic import ( "context" "fmt" - "github.com/google/uuid" - "github.com/ydb-platform/ydb-go-sdk/v3/trace" + "github.com/google/uuid" "github.com/ydb-platform/ydb-go-genproto/Ydb_Topic_V1" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) type Client struct { diff --git a/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go b/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go index 9ab144dd5..920683c38 100644 --- a/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go +++ b/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go @@ -3,7 +3,6 @@ package rawtopicwriter import ( "errors" "fmt" - "github.com/ydb-platform/ydb-go-sdk/v3/trace" "reflect" "sync" "sync/atomic" @@ -15,6 +14,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) var errConcurencyReadDenied = xerrors.Wrap(errors.New("ydb: read from rawtopicwriter in parallel")) @@ -38,6 +38,7 @@ type StreamWriter struct { sessionID string } +//nolint:funlen func (w *StreamWriter) Recv() (ServerMessage, error) { readCnt := atomic.AddInt32(&w.readCounter, 1) defer atomic.AddInt32(&w.readCounter, -1) @@ -46,22 +47,27 @@ func (w *StreamWriter) Recv() (ServerMessage, error) { return nil, xerrors.WithStackTrace(errConcurencyReadDenied) } - grpcMsg, err := w.Stream.Recv() + grpcMsg, sendErr := w.Stream.Recv() w.readMessagesCount++ - trace.TopicOnWriterReceiveGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, err) - if err != nil { - if !xerrors.IsErrorFromServer(err) { - err = xerrors.Transport(err) + defer func() { + // defer needs for set good session id on first init response before trace the message + trace.TopicOnWriterReceiveGRPCMessage( + w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr, + ) + }() + if sendErr != nil { + if !xerrors.IsErrorFromServer(sendErr) { + sendErr = xerrors.Transport(sendErr) } return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( "ydb: failed to read grpc message from writer stream: %w", - err, + sendErr, ))) } var meta rawtopiccommon.ServerMessageMetadata - if err = meta.MetaFromStatusAndIssues(grpcMsg); err != nil { + if err := meta.MetaFromStatusAndIssues(grpcMsg); err != nil { return nil, err } if !meta.Status.IsSuccess() { @@ -79,7 +85,7 @@ func (w *StreamWriter) Recv() (ServerMessage, error) { case *Ydb_Topic.StreamWriteMessage_FromServer_WriteResponse: var res WriteResult res.ServerMessageMetadata = meta - err = res.fromProto(v.WriteResponse) + err := res.fromProto(v.WriteResponse) if err != nil { return nil, err } diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index 7bcf4a091..06c5eacbb 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -6,7 +6,6 @@ import ( "context" "errors" "fmt" - "github.com/ydb-platform/ydb-go-sdk/v3/trace" "io" "math" "sort" @@ -25,6 +24,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) var testCommonEncoders = NewMultiEncoder() diff --git a/log/topic.go b/log/topic.go index 664d6391b..9773391b9 100644 --- a/log/topic.go +++ b/log/topic.go @@ -2,10 +2,11 @@ package log import ( "context" + "time" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" - "time" "github.com/ydb-platform/ydb-go-sdk/v3/internal/kv" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -948,17 +949,17 @@ func (s lazyProtoStringifer) String() string { Data []byte Metadata []*Ydb_Topic.MetadataItem } - storage := make([]messDataType, len(data.Messages)) - for i := range data.Messages { - storage[i].Data = data.Messages[i].Data + storage := make([]messDataType, len(data.GetMessages())) + for i := range data.GetMessages() { + storage[i].Data = data.GetMessages()[i].GetData() data.Messages[i] = nil - storage[i].Metadata = data.Messages[i].MetadataItems + storage[i].Metadata = data.GetMessages()[i].GetMetadataItems() data.Messages[i].MetadataItems = nil } defer func() { - for i := range data.Messages { + for i := range data.GetMessages() { data.Messages[i].Data = storage[i].Data data.Messages[i].MetadataItems = storage[i].Metadata } @@ -967,5 +968,6 @@ func (s lazyProtoStringifer) String() string { } res := protojson.MarshalOptions{AllowPartial: true}.Format(s.message) + return res } diff --git a/tests/integration/topic_read_writer_test.go b/tests/integration/topic_read_writer_test.go index b10e6c938..a5773a390 100644 --- a/tests/integration/topic_read_writer_test.go +++ b/tests/integration/topic_read_writer_test.go @@ -8,7 +8,6 @@ import ( "context" "encoding/binary" "errors" - "github.com/ydb-platform/ydb-go-sdk/v3/log" "io" "os" "runtime/pprof" @@ -27,6 +26,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/empty" "github.com/ydb-platform/ydb-go-sdk/v3/internal/version" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" + "github.com/ydb-platform/ydb-go-sdk/v3/log" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes" diff --git a/trace/topic.go b/trace/topic.go index 4819d4c18..00b41b81a 100644 --- a/trace/topic.go +++ b/trace/topic.go @@ -2,7 +2,8 @@ package trace import ( "context" - "google.golang.org/protobuf/proto" + + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic" ) // tool gtrace used from ./internal/cmd/gtrace @@ -544,7 +545,7 @@ type ( TopicStreamInternalID string SessionID string MessageNumber int - Message proto.Message + Message *Ydb_Topic.StreamWriteMessage_FromClient Error error } @@ -553,7 +554,7 @@ type ( TopicStreamInternalID string SessionID string MessageNumber int - Message proto.Message + Message *Ydb_Topic.StreamWriteMessage_FromServer Error error } diff --git a/trace/topic_gtrace.go b/trace/topic_gtrace.go index f9ab97a71..f9c98db5b 100644 --- a/trace/topic_gtrace.go +++ b/trace/topic_gtrace.go @@ -5,7 +5,7 @@ package trace import ( "context" - "google.golang.org/protobuf/reflect/protoreflect" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic" ) // topicComposeOptions is a holder of options @@ -28,6 +28,9 @@ func WithTopicPanicCallback(cb func(e interface{})) TopicComposeOption { // Compose returns a new Topic which has functional fields composed both from t and x. // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func (t *Topic) Compose(x *Topic, opts ...TopicComposeOption) *Topic { + if t == nil { + return x + } var ret Topic options := topicComposeOptions{} for _, opt := range opts { @@ -1453,7 +1456,6 @@ func (t *Topic) onWriterReadUnknownGrpcMessage(t1 TopicOnWriterReadUnknownGrpcMe } fn(t1) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderStart(t *Topic, readerID int64, consumer string, e error) { var p TopicReaderStartInfo @@ -1462,7 +1464,6 @@ func TopicOnReaderStart(t *Topic, readerID int64, consumer string, e error) { p.Error = e t.onReaderStart(p) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReconnect(t *Topic, reason error) func(error) { var p TopicReaderReconnectStartInfo @@ -1474,7 +1475,6 @@ func TopicOnReaderReconnect(t *Topic, reason error) func(error) { res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReconnectRequest(t *Topic, reason error, wasSent bool) { var p TopicReaderReconnectRequestInfo @@ -1482,7 +1482,6 @@ func TopicOnReaderReconnectRequest(t *Topic, reason error, wasSent bool) { p.WasSent = wasSent t.onReaderReconnectRequest(p) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string, partitionContext *context.Context, topic string, partitionID int64, partitionSessionID int64) func(readOffset *int64, commitOffset *int64, _ error) { var p TopicReaderPartitionReadStartResponseStartInfo @@ -1500,7 +1499,6 @@ func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, partitionContext context.Context, topic string, partitionID int64, partitionSessionID int64, committedOffset int64, graceful bool) func(error) { var p TopicReaderPartitionReadStopResponseStartInfo @@ -1518,7 +1516,6 @@ func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string, partitionID int64, partitionSessionID int64, startOffset int64, endOffset int64) func(error) { var p TopicReaderCommitStartInfo @@ -1535,7 +1532,6 @@ func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo) func(error) { var p TopicReaderSendCommitMessageStartInfo @@ -1547,7 +1543,6 @@ func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendC res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic string, partitionID int64, partitionSessionID int64, committedOffset int64) { var p TopicReaderCommittedNotifyInfo @@ -1558,7 +1553,6 @@ func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic str p.CommittedOffset = committedOffset t.onReaderCommittedNotify(p) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) func(closeError error) { var p TopicReaderCloseStartInfo @@ -1571,7 +1565,6 @@ func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestInfo TopicReadStreamInitRequestInfo) func(readerConnectionID string, _ error) { var p TopicReaderInitStartInfo @@ -1585,7 +1578,6 @@ func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestIn res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderError(t *Topic, readerConnectionID string, e error) { var p TopicReaderErrorInfo @@ -1593,7 +1585,6 @@ func TopicOnReaderError(t *Topic, readerConnectionID string, e error) { p.Error = e t.onReaderError(p) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderUpdateToken(t *Topic, readerConnectionID string) func(tokenLen int, _ error) func(error) { var p OnReadUpdateTokenStartInfo @@ -1611,7 +1602,6 @@ func TopicOnReaderUpdateToken(t *Topic, readerConnectionID string) func(tokenLen } } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderPopBatchTx(t *Topic, c *context.Context, readerID int64, transactionSessionID string, tx txInfo) func(startOffset int64, endOffset int64, messagesCount int, _ error) { var p TopicReaderPopBatchTxStartInfo @@ -1629,7 +1619,6 @@ func TopicOnReaderPopBatchTx(t *Topic, c *context.Context, readerID int64, trans res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderStreamPopBatchTx(t *Topic, c *context.Context, readerID int64, readerConnectionID string, transactionSessionID string, tx txInfo) func(error) { var p TopicReaderStreamPopBatchTxStartInfo @@ -1645,7 +1634,6 @@ func TopicOnReaderStreamPopBatchTx(t *Topic, c *context.Context, readerID int64, res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderUpdateOffsetsInTransaction(t *Topic, c *context.Context, readerID int64, readerConnectionID string, transactionSessionID string, tx txInfo) func(error) { var p TopicReaderOnUpdateOffsetsInTransactionStartInfo @@ -1661,7 +1649,6 @@ func TopicOnReaderUpdateOffsetsInTransaction(t *Topic, c *context.Context, reade res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderTransactionCompleted(t *Topic, c *context.Context, readerID int64, readerConnectionID string, transactionSessionID string, tx txInfo, transactionResult error) func() { var p TopicReaderTransactionCompletedStartInfo @@ -1677,7 +1664,6 @@ func TopicOnReaderTransactionCompleted(t *Topic, c *context.Context, readerID in res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderTransactionRollback(t *Topic, c *context.Context, readerID int64, readerConnectionID string, transactionSessionID string, tx txInfo) func(rollbackError error) { var p TopicReaderTransactionRollbackStartInfo @@ -1693,7 +1679,6 @@ func TopicOnReaderTransactionRollback(t *Topic, c *context.Context, readerID int res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderSentDataRequest(t *Topic, readerConnectionID string, requestBytes int, localBufferSizeAfterSent int) { var p TopicReaderSentDataRequestInfo @@ -1702,7 +1687,6 @@ func TopicOnReaderSentDataRequest(t *Topic, readerConnectionID string, requestBy p.LocalBufferSizeAfterSent = localBufferSizeAfterSent t.onReaderSentDataRequest(p) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, localBufferSizeAfterReceive int, dataResponse TopicReaderDataResponseInfo) func(error) { var p TopicReaderReceiveDataResponseStartInfo @@ -1716,7 +1700,6 @@ func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, local res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCount int, maxCount int, freeBufferCapacity int) func(messagesCount int, topic string, partitionID int64, partitionSessionID int64, offsetStart int64, offsetEnd int64, freeBufferCapacity int, _ error) { var p TopicReaderReadMessagesStartInfo @@ -1738,7 +1721,6 @@ func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCou res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderUnknownGrpcMessage(t *Topic, readerConnectionID string, e error) { var p OnReadUnknownGrpcMessageInfo @@ -1746,7 +1728,6 @@ func TopicOnReaderUnknownGrpcMessage(t *Topic, readerConnectionID string, e erro p.Error = e t.onReaderUnknownGrpcMessage(p) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, producerID string, attempt int) func(connectionResult error) func(error) { var p TopicWriterReconnectStartInfo @@ -1766,7 +1747,6 @@ func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, pro } } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, producerID string) func(sessionID string, _ error) { var p TopicWriterInitStreamStartInfo @@ -1781,7 +1761,6 @@ func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, pr res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(error) { var p TopicWriterCloseStartInfo @@ -1794,7 +1773,6 @@ func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(er res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterBeforeCommitTransaction(t *Topic, ctx *context.Context, kqpSessionID string, topicSessionID string, transactionID string) func(_ error, topicSessionID string) { var p TopicOnWriterBeforeCommitTransactionStartInfo @@ -1810,7 +1788,6 @@ func TopicOnWriterBeforeCommitTransaction(t *Topic, ctx *context.Context, kqpSes res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterAfterFinishTransaction(t *Topic, e error, sessionID string, transactionID string) func(closeError error) { var p TopicOnWriterAfterFinishTransactionStartInfo @@ -1824,7 +1801,6 @@ func TopicOnWriterAfterFinishTransaction(t *Topic, e error, sessionID string, tr res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int, reason TopicWriterCompressMessagesReason) func(error) { var p TopicWriterCompressMessagesStartInfo @@ -1841,7 +1817,6 @@ func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int) func(error) { var p TopicWriterSendMessagesStartInfo @@ -1857,7 +1832,6 @@ func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID stri res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterReceiveResult(t *Topic, writerInstanceID string, sessionID string, partitionID int64, acks TopicWriterResultMessagesInfoAcks) { var p TopicWriterResultMessagesInfo @@ -1867,9 +1841,8 @@ func TopicOnWriterReceiveResult(t *Topic, writerInstanceID string, sessionID str p.Acks = acks t.onWriterReceiveResult(p) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterSentGRPCMessage(t *Topic, topicStreamInternalID string, sessionID string, messageNumber int, message protoreflect.ProtoMessage, e error) { +func TopicOnWriterSentGRPCMessage(t *Topic, topicStreamInternalID string, sessionID string, messageNumber int, message *Ydb_Topic.StreamWriteMessage_FromClient, e error) { var p TopicWriterSentGRPCMessageInfo p.TopicStreamInternalID = topicStreamInternalID p.SessionID = sessionID @@ -1878,9 +1851,8 @@ func TopicOnWriterSentGRPCMessage(t *Topic, topicStreamInternalID string, sessio p.Error = e t.onWriterSentGRPCMessage(p) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterReceiveGRPCMessage(t *Topic, topicStreamInternalID string, sessionID string, messageNumber int, message protoreflect.ProtoMessage, e error) { +func TopicOnWriterReceiveGRPCMessage(t *Topic, topicStreamInternalID string, sessionID string, messageNumber int, message *Ydb_Topic.StreamWriteMessage_FromServer, e error) { var p TopicWriterReceiveGRPCMessageInfo p.TopicStreamInternalID = topicStreamInternalID p.SessionID = sessionID @@ -1889,7 +1861,6 @@ func TopicOnWriterReceiveGRPCMessage(t *Topic, topicStreamInternalID string, ses p.Error = e t.onWriterReceiveGRPCMessage(p) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterReadUnknownGrpcMessage(t *Topic, writerInstanceID string, sessionID string, e error) { var p TopicOnWriterReadUnknownGrpcMessageInfo