diff --git a/CHANGELOG.md b/CHANGELOG.md index a9526ac97..8759dabaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added log grpc messages metadata on trace log level for topic writer + ## v3.99.5 * Fixed error `Empty query text` using prepared statements and `ydb.WithExecuteDataQueryOverQueryClient(true)` option * Prepared statements always send query text on Execute call from now (previous behaviour - send query ID) diff --git a/internal/grpcwrapper/rawtopic/client.go b/internal/grpcwrapper/rawtopic/client.go index 77fa4001e..203622e85 100644 --- a/internal/grpcwrapper/rawtopic/client.go +++ b/internal/grpcwrapper/rawtopic/client.go @@ -4,12 +4,14 @@ import ( "context" "fmt" + "github.com/google/uuid" "github.com/ydb-platform/ydb-go-genproto/Ydb_Topic_V1" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) type Client struct { @@ -99,7 +101,10 @@ func (c *Client) StreamRead(ctxStreamLifeTime context.Context) (rawtopicreader.S return rawtopicreader.StreamReader{Stream: protoResp}, nil } -func (c *Client) StreamWrite(ctxStreamLifeTime context.Context) (*rawtopicwriter.StreamWriter, error) { +func (c *Client) StreamWrite( + ctxStreamLifeTime context.Context, + tracer *trace.Topic, +) (*rawtopicwriter.StreamWriter, error) { protoResp, err := c.service.StreamWrite(ctxStreamLifeTime) if err != nil { return nil, xerrors.WithStackTrace( @@ -109,7 +114,11 @@ func (c *Client) StreamWrite(ctxStreamLifeTime context.Context) (*rawtopicwriter ) } - return &rawtopicwriter.StreamWriter{Stream: protoResp}, nil + return &rawtopicwriter.StreamWriter{ + Stream: protoResp, + Tracer: tracer, + InternalStreamID: uuid.New().String(), + }, nil } func (c *Client) UpdateOffsetsInTransaction( diff --git a/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go b/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go index 8ff0b9727..920683c38 100644 --- a/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go +++ b/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go @@ -14,6 +14,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) var errConcurencyReadDenied = xerrors.Wrap(errors.New("ydb: read from rawtopicwriter in parallel")) @@ -29,8 +30,15 @@ type StreamWriter struct { sendCloseMtx sync.Mutex Stream GrpcStream + + Tracer *trace.Topic + InternalStreamID string + readMessagesCount int + writtenMessagesCount int + sessionID string } +//nolint:funlen func (w *StreamWriter) Recv() (ServerMessage, error) { readCnt := atomic.AddInt32(&w.readCounter, 1) defer atomic.AddInt32(&w.readCounter, -1) @@ -39,20 +47,27 @@ func (w *StreamWriter) Recv() (ServerMessage, error) { return nil, xerrors.WithStackTrace(errConcurencyReadDenied) } - grpcMsg, err := w.Stream.Recv() - if err != nil { - if !xerrors.IsErrorFromServer(err) { - err = xerrors.Transport(err) + grpcMsg, sendErr := w.Stream.Recv() + w.readMessagesCount++ + defer func() { + // defer needs for set good session id on first init response before trace the message + trace.TopicOnWriterReceiveGRPCMessage( + w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr, + ) + }() + if sendErr != nil { + if !xerrors.IsErrorFromServer(sendErr) { + sendErr = xerrors.Transport(sendErr) } return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( "ydb: failed to read grpc message from writer stream: %w", - err, + sendErr, ))) } var meta rawtopiccommon.ServerMessageMetadata - if err = meta.MetaFromStatusAndIssues(grpcMsg); err != nil { + if err := meta.MetaFromStatusAndIssues(grpcMsg); err != nil { return nil, err } if !meta.Status.IsSuccess() { @@ -64,12 +79,13 @@ func (w *StreamWriter) Recv() (ServerMessage, error) { var res InitResult res.ServerMessageMetadata = meta res.mustFromProto(v.InitResponse) + w.sessionID = res.SessionID return &res, nil case *Ydb_Topic.StreamWriteMessage_FromServer_WriteResponse: var res WriteResult res.ServerMessageMetadata = meta - err = res.fromProto(v.WriteResponse) + err := res.fromProto(v.WriteResponse) if err != nil { return nil, err } @@ -124,6 +140,8 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) { } err = w.Stream.Send(&protoMsg) + w.writtenMessagesCount++ + trace.TopicOnWriterSentGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.writtenMessagesCount, &protoMsg, err) if err != nil { return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed to send grpc message to writer stream: %w", err))) } diff --git a/internal/topic/topicclientinternal/client.go b/internal/topic/topicclientinternal/client.go index 60050c26e..cb4672c25 100644 --- a/internal/topic/topicclientinternal/client.go +++ b/internal/topic/topicclientinternal/client.go @@ -356,11 +356,11 @@ func (c *Client) createWriterConfig( topicPath string, opts []topicoptions.WriterOption, ) topicwriterinternal.WriterReconnectorConfig { - var connector topicwriterinternal.ConnectFunc = func(ctx context.Context) ( + var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) ( topicwriterinternal.RawTopicWriterStream, error, ) { - return c.rawClient.StreamWrite(ctx) + return c.rawClient.StreamWrite(ctx, tracer) } options := []topicoptions.WriterOption{ diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index 2491e5a0c..217652cd7 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -517,7 +517,7 @@ func (w *WriterReconnector) connectWithTimeout(streamLifetimeContext context.Con } }() - stream, err := w.cfg.Connect(connectCtx) + stream, err := w.cfg.Connect(connectCtx, w.cfg.Tracer) resCh <- resT{stream: stream, err: err} }() @@ -789,7 +789,7 @@ func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, multiEncoder *Multi return res } -type ConnectFunc func(ctx context.Context) (RawTopicWriterStream, error) +type ConnectFunc func(ctx context.Context, tracer *trace.Topic) (RawTopicWriterStream, error) func createPublicCodecsFromRaw(codecs rawtopiccommon.SupportedCodecs) []topictypes.Codec { res := make([]topictypes.Codec, len(codecs)) diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index 405265399..06c5eacbb 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -24,6 +24,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) var testCommonEncoders = NewMultiEncoder() @@ -446,7 +447,7 @@ func TestWriterImpl_Reconnect(t *testing.T) { connectCalled := false connectCalledChan := make(empty.Chan) - w.cfg.Connect = func(streamCtxArg context.Context) (RawTopicWriterStream, error) { + w.cfg.Connect = func(streamCtxArg context.Context, _ *trace.Topic) (RawTopicWriterStream, error) { close(connectCalledChan) connectCalled = true require.NotEqual(t, ctx, streamCtxArg) @@ -562,7 +563,7 @@ func TestWriterImpl_Reconnect(t *testing.T) { } var connectionAttempt atomic.Int64 - w.cfg.Connect = func(ctx context.Context) (RawTopicWriterStream, error) { + w.cfg.Connect = func(ctx context.Context, _ *trace.Topic) (RawTopicWriterStream, error) { attemptIndex := int(connectionAttempt.Add(1)) - 1 t.Logf("connect with attempt index: %v", attemptIndex) res := connectsResult[attemptIndex] @@ -1078,17 +1079,18 @@ func newTestEnv(t testing.TB, options *testEnvOptions) *testEnv { partitionID: 14, } - writerOptions := append(defaultTestWriterOptions(), WithConnectFunc(func(ctx context.Context) ( - RawTopicWriterStream, - error, - ) { - connectNum := atomic.AddInt64(&res.connectCount, 1) - if connectNum > 1 { - t.Fatalf("test: default env support most one connection") - } + writerOptions := append(defaultTestWriterOptions(), WithConnectFunc( + func(ctx context.Context, _ *trace.Topic) ( + RawTopicWriterStream, + error, + ) { + connectNum := atomic.AddInt64(&res.connectCount, 1) + if connectNum > 1 { + t.Fatalf("test: default env support most one connection") + } - return res.stream, nil - })) + return res.stream, nil + })) writerOptions = append(writerOptions, options.writerOptions...) res.writer = newWriterReconnectorStopped(NewWriterReconnectorConfig(writerOptions...)) diff --git a/log/topic.go b/log/topic.go index 2d1b8164d..9773391b9 100644 --- a/log/topic.go +++ b/log/topic.go @@ -4,6 +4,10 @@ import ( "context" "time" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/kv" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -885,6 +889,39 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { kv.Version(), ) } + + t.OnWriterSentGRPCMessage = func(info trace.TopicWriterSentGRPCMessageInfo) { + if d.Details()&trace.TopicWriterStreamGrpcMessageEvents == 0 { + return + } + + ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "grpc") + l.Log( + ctx, "topic writer sent grpc message (message body and metadata are removed)", + kv.String("topic_stream_internal_id", info.TopicStreamInternalID), + kv.String("session_id", info.SessionID), + kv.Int("message_number", info.MessageNumber), + kv.Stringer("message", lazyProtoStringifer{info.Message}), + kv.Error(info.Error), + kv.Version(), + ) + } + t.OnWriterReceiveGRPCMessage = func(info trace.TopicWriterReceiveGRPCMessageInfo) { + if d.Details()&trace.TopicWriterStreamGrpcMessageEvents == 0 { + return + } + + ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "grpc") + l.Log( + ctx, "topic writer received grpc message (message body and metadata are removed)", + kv.String("topic_stream_internal_id", info.TopicStreamInternalID), + kv.String("session_id", info.SessionID), + kv.Int("message_number", info.MessageNumber), + kv.Stringer("message", lazyProtoStringifer{info.Message}), + kv.Error(info.Error), + kv.Version(), + ) + } t.OnWriterReadUnknownGrpcMessage = func(info trace.TopicOnWriterReadUnknownGrpcMessageInfo) { if d.Details()&trace.TopicWriterStreamEvents == 0 { return @@ -899,3 +936,38 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { return t } + +type lazyProtoStringifer struct { + message proto.Message +} + +func (s lazyProtoStringifer) String() string { + // cut message data + if writeRequest, ok := s.message.(*Ydb_Topic.StreamWriteMessage_FromClient); ok { + if data := writeRequest.GetWriteRequest(); data != nil { + type messDataType struct { + Data []byte + Metadata []*Ydb_Topic.MetadataItem + } + storage := make([]messDataType, len(data.GetMessages())) + for i := range data.GetMessages() { + storage[i].Data = data.GetMessages()[i].GetData() + data.Messages[i] = nil + + storage[i].Metadata = data.GetMessages()[i].GetMetadataItems() + data.Messages[i].MetadataItems = nil + } + + defer func() { + for i := range data.GetMessages() { + data.Messages[i].Data = storage[i].Data + data.Messages[i].MetadataItems = storage[i].Metadata + } + }() + } + } + + res := protojson.MarshalOptions{AllowPartial: true}.Format(s.message) + + return res +} diff --git a/tests/integration/topic_read_writer_test.go b/tests/integration/topic_read_writer_test.go index 19f440b4b..a5773a390 100644 --- a/tests/integration/topic_read_writer_test.go +++ b/tests/integration/topic_read_writer_test.go @@ -26,6 +26,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/empty" "github.com/ydb-platform/ydb-go-sdk/v3/internal/version" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" + "github.com/ydb-platform/ydb-go-sdk/v3/log" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes" @@ -33,6 +34,61 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) +func TestTopicWriterLogMessagesWithoutData(t *testing.T) { + scope := newScope(t) + + producerID := "dkeujsl" + const seqNoInt = 486812497 + seqNoString := "486812497" + data := "kdjwkruowe" + metaKey := "gyoeexiufo" + metaValue := "fjedeikeosbv" + + logs := &strings.Builder{} + writer, err := scope.Driver().Topic().StartWriter( + scope.TopicPath(), + topicoptions.WithWriterProducerID(producerID), + topicoptions.WithWriterSetAutoSeqNo(false), + topicoptions.WithWriterWaitServerAck(true), + topicoptions.WithWriterTrace(log.Topic( + log.Default(logs, log.WithMinLevel(log.TRACE)), trace.TopicWriterStreamGrpcMessageEvents), + ), + ) + + scope.Require.NoError(err) + err = writer.Write(scope.Ctx, + topicwriter.Message{ + SeqNo: seqNoInt, + Data: strings.NewReader(data), + Metadata: map[string][]byte{ + metaKey: []byte(metaValue), + }, + }, + ) + scope.Require.NoError(err) + + err = writer.Close(scope.Ctx) + scope.Require.NoError(err) + + logsString := logs.String() + scope.Require.Contains(logsString, producerID) + scope.Require.Contains(logsString, seqNoString) + scope.Require.NotContains(logsString, metaKey) + scope.Require.NotContains(logsString, metaValue) + scope.Require.NotContains(logsString, data) + + mess, err := scope.TopicReader().ReadMessage(scope.Ctx) + scope.Require.NoError(err) + + scope.Require.Equal(producerID, mess.ProducerID) + scope.Require.Equal(int64(seqNoInt), mess.SeqNo) + scope.Require.Equal(metaValue, string(mess.Metadata[metaKey])) + + messData, err := io.ReadAll(mess) + scope.Require.NoError(err) + scope.Require.Equal(data, string(messData)) +} + func TestSendAsyncMessages(t *testing.T) { ctx := context.Background() db := connect(t) diff --git a/trace/details.go b/trace/details.go index 4b335b748..1d7fdf560 100644 --- a/trace/details.go +++ b/trace/details.go @@ -64,6 +64,7 @@ const ( TopicWriterStreamLifeCycleEvents TopicWriterStreamEvents + TopicWriterStreamGrpcMessageEvents DatabaseSQLConnectorEvents DatabaseSQLConnEvents @@ -116,7 +117,10 @@ const ( TopicReaderPartitionEvents | TopicReaderStreamLifeCycleEvents - TopicEvents = TopicControlPlaneEvents | TopicReaderEvents + TopicWriterEvents = TopicWriterStreamLifeCycleEvents | TopicWriterStreamEvents | + TopicWriterStreamGrpcMessageEvents + + TopicEvents = TopicControlPlaneEvents | TopicReaderEvents | TopicWriterEvents DatabaseSQLEvents = DatabaseSQLConnectorEvents | DatabaseSQLConnEvents | @@ -169,15 +173,16 @@ var ( DatabaseSQLTxEvents: "ydb.database.sql.tx", DatabaseSQLStmtEvents: "ydb.database.sql.stmt", - TopicEvents: "ydb.topic", - TopicControlPlaneEvents: "ydb.topic.controlplane", - TopicReaderEvents: "ydb.topic.reader", - TopicReaderStreamEvents: "ydb.topic.reader.stream", - TopicReaderMessageEvents: "ydb.topic.reader.message", - TopicReaderPartitionEvents: "ydb.topic.reader.partition", - TopicReaderStreamLifeCycleEvents: "ydb.topic.reader.lifecycle", - TopicWriterStreamLifeCycleEvents: "ydb.topic.writer.lifecycle", - TopicWriterStreamEvents: "ydb.topic.writer.stream", + TopicEvents: "ydb.topic", + TopicControlPlaneEvents: "ydb.topic.controlplane", + TopicReaderEvents: "ydb.topic.reader", + TopicReaderStreamEvents: "ydb.topic.reader.stream", + TopicReaderMessageEvents: "ydb.topic.reader.message", + TopicReaderPartitionEvents: "ydb.topic.reader.partition", + TopicReaderStreamLifeCycleEvents: "ydb.topic.reader.lifecycle", + TopicWriterStreamLifeCycleEvents: "ydb.topic.writer.lifecycle", + TopicWriterStreamEvents: "ydb.topic.writer.stream", + TopicWriterStreamGrpcMessageEvents: "ydb.topic.writer.grpc", } defaultDetails = DetailsAll ) diff --git a/trace/topic.go b/trace/topic.go index 7d68906e7..00b41b81a 100644 --- a/trace/topic.go +++ b/trace/topic.go @@ -2,6 +2,8 @@ package trace import ( "context" + + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic" ) // tool gtrace used from ./internal/cmd/gtrace @@ -133,6 +135,10 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnWriterReceiveResult func(TopicWriterResultMessagesInfo) // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + OnWriterSentGRPCMessage func(TopicWriterSentGRPCMessageInfo) + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + OnWriterReceiveGRPCMessage func(TopicWriterReceiveGRPCMessageInfo) + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnWriterReadUnknownGrpcMessage func(TopicOnWriterReadUnknownGrpcMessageInfo) } @@ -534,6 +540,24 @@ type ( CloseError error } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + TopicWriterSentGRPCMessageInfo struct { + TopicStreamInternalID string + SessionID string + MessageNumber int + Message *Ydb_Topic.StreamWriteMessage_FromClient + Error error + } + + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + TopicWriterReceiveGRPCMessageInfo struct { + TopicStreamInternalID string + SessionID string + MessageNumber int + Message *Ydb_Topic.StreamWriteMessage_FromServer + Error error + } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicOnWriterReadUnknownGrpcMessageInfo struct { WriterInstanceID string diff --git a/trace/topic_gtrace.go b/trace/topic_gtrace.go index 5733d21a1..f9c98db5b 100644 --- a/trace/topic_gtrace.go +++ b/trace/topic_gtrace.go @@ -4,6 +4,8 @@ package trace import ( "context" + + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic" ) // topicComposeOptions is a holder of options @@ -971,6 +973,44 @@ func (t *Topic) Compose(x *Topic, opts ...TopicComposeOption) *Topic { } } } + { + h1 := t.OnWriterSentGRPCMessage + h2 := x.OnWriterSentGRPCMessage + ret.OnWriterSentGRPCMessage = func(t TopicWriterSentGRPCMessageInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(t) + } + if h2 != nil { + h2(t) + } + } + } + { + h1 := t.OnWriterReceiveGRPCMessage + h2 := x.OnWriterReceiveGRPCMessage + ret.OnWriterReceiveGRPCMessage = func(t TopicWriterReceiveGRPCMessageInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(t) + } + if h2 != nil { + h2(t) + } + } + } { h1 := t.OnWriterReadUnknownGrpcMessage h2 := x.OnWriterReadUnknownGrpcMessage @@ -1395,6 +1435,20 @@ func (t *Topic) onWriterReceiveResult(t1 TopicWriterResultMessagesInfo) { } fn(t1) } +func (t *Topic) onWriterSentGRPCMessage(t1 TopicWriterSentGRPCMessageInfo) { + fn := t.OnWriterSentGRPCMessage + if fn == nil { + return + } + fn(t1) +} +func (t *Topic) onWriterReceiveGRPCMessage(t1 TopicWriterReceiveGRPCMessageInfo) { + fn := t.OnWriterReceiveGRPCMessage + if fn == nil { + return + } + fn(t1) +} func (t *Topic) onWriterReadUnknownGrpcMessage(t1 TopicOnWriterReadUnknownGrpcMessageInfo) { fn := t.OnWriterReadUnknownGrpcMessage if fn == nil { @@ -1788,6 +1842,26 @@ func TopicOnWriterReceiveResult(t *Topic, writerInstanceID string, sessionID str t.onWriterReceiveResult(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals +func TopicOnWriterSentGRPCMessage(t *Topic, topicStreamInternalID string, sessionID string, messageNumber int, message *Ydb_Topic.StreamWriteMessage_FromClient, e error) { + var p TopicWriterSentGRPCMessageInfo + p.TopicStreamInternalID = topicStreamInternalID + p.SessionID = sessionID + p.MessageNumber = messageNumber + p.Message = message + p.Error = e + t.onWriterSentGRPCMessage(p) +} +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals +func TopicOnWriterReceiveGRPCMessage(t *Topic, topicStreamInternalID string, sessionID string, messageNumber int, message *Ydb_Topic.StreamWriteMessage_FromServer, e error) { + var p TopicWriterReceiveGRPCMessageInfo + p.TopicStreamInternalID = topicStreamInternalID + p.SessionID = sessionID + p.MessageNumber = messageNumber + p.Message = message + p.Error = e + t.onWriterReceiveGRPCMessage(p) +} +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterReadUnknownGrpcMessage(t *Topic, writerInstanceID string, sessionID string, e error) { var p TopicOnWriterReadUnknownGrpcMessageInfo p.WriterInstanceID = writerInstanceID