Skip to content

Commit

Permalink
Merge pull request #1638 Added log grpc messages metadata on trace lo…
Browse files Browse the repository at this point in the history
…g level for topic writer
  • Loading branch information
rekby authored Feb 7, 2025
2 parents 7ec7b21 + c66c90a commit 68d4894
Show file tree
Hide file tree
Showing 11 changed files with 297 additions and 35 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added log grpc messages metadata on trace log level for topic writer

## v3.99.5
* Fixed error `Empty query text` using prepared statements and `ydb.WithExecuteDataQueryOverQueryClient(true)` option
* Prepared statements always send query text on Execute call from now (previous behaviour - send query ID)
Expand Down
13 changes: 11 additions & 2 deletions internal/grpcwrapper/rawtopic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"fmt"

"github.com/google/uuid"
"github.com/ydb-platform/ydb-go-genproto/Ydb_Topic_V1"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

type Client struct {
Expand Down Expand Up @@ -99,7 +101,10 @@ func (c *Client) StreamRead(ctxStreamLifeTime context.Context) (rawtopicreader.S
return rawtopicreader.StreamReader{Stream: protoResp}, nil
}

func (c *Client) StreamWrite(ctxStreamLifeTime context.Context) (*rawtopicwriter.StreamWriter, error) {
func (c *Client) StreamWrite(
ctxStreamLifeTime context.Context,
tracer *trace.Topic,
) (*rawtopicwriter.StreamWriter, error) {
protoResp, err := c.service.StreamWrite(ctxStreamLifeTime)
if err != nil {
return nil, xerrors.WithStackTrace(
Expand All @@ -109,7 +114,11 @@ func (c *Client) StreamWrite(ctxStreamLifeTime context.Context) (*rawtopicwriter
)
}

return &rawtopicwriter.StreamWriter{Stream: protoResp}, nil
return &rawtopicwriter.StreamWriter{
Stream: protoResp,
Tracer: tracer,
InternalStreamID: uuid.New().String(),
}, nil
}

func (c *Client) UpdateOffsetsInTransaction(
Expand Down
32 changes: 25 additions & 7 deletions internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

var errConcurencyReadDenied = xerrors.Wrap(errors.New("ydb: read from rawtopicwriter in parallel"))
Expand All @@ -29,8 +30,15 @@ type StreamWriter struct {

sendCloseMtx sync.Mutex
Stream GrpcStream

Tracer *trace.Topic
InternalStreamID string
readMessagesCount int
writtenMessagesCount int
sessionID string
}

//nolint:funlen
func (w *StreamWriter) Recv() (ServerMessage, error) {
readCnt := atomic.AddInt32(&w.readCounter, 1)
defer atomic.AddInt32(&w.readCounter, -1)
Expand All @@ -39,20 +47,27 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
return nil, xerrors.WithStackTrace(errConcurencyReadDenied)
}

grpcMsg, err := w.Stream.Recv()
if err != nil {
if !xerrors.IsErrorFromServer(err) {
err = xerrors.Transport(err)
grpcMsg, sendErr := w.Stream.Recv()
w.readMessagesCount++
defer func() {
// defer needs for set good session id on first init response before trace the message
trace.TopicOnWriterReceiveGRPCMessage(
w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
)
}()
if sendErr != nil {
if !xerrors.IsErrorFromServer(sendErr) {
sendErr = xerrors.Transport(sendErr)
}

return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
"ydb: failed to read grpc message from writer stream: %w",
err,
sendErr,
)))
}

var meta rawtopiccommon.ServerMessageMetadata
if err = meta.MetaFromStatusAndIssues(grpcMsg); err != nil {
if err := meta.MetaFromStatusAndIssues(grpcMsg); err != nil {
return nil, err
}
if !meta.Status.IsSuccess() {
Expand All @@ -64,12 +79,13 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
var res InitResult
res.ServerMessageMetadata = meta
res.mustFromProto(v.InitResponse)
w.sessionID = res.SessionID

return &res, nil
case *Ydb_Topic.StreamWriteMessage_FromServer_WriteResponse:
var res WriteResult
res.ServerMessageMetadata = meta
err = res.fromProto(v.WriteResponse)
err := res.fromProto(v.WriteResponse)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -124,6 +140,8 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) {
}

err = w.Stream.Send(&protoMsg)
w.writtenMessagesCount++
trace.TopicOnWriterSentGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.writtenMessagesCount, &protoMsg, err)
if err != nil {
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed to send grpc message to writer stream: %w", err)))
}
Expand Down
4 changes: 2 additions & 2 deletions internal/topic/topicclientinternal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,11 @@ func (c *Client) createWriterConfig(
topicPath string,
opts []topicoptions.WriterOption,
) topicwriterinternal.WriterReconnectorConfig {
var connector topicwriterinternal.ConnectFunc = func(ctx context.Context) (
var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) (
topicwriterinternal.RawTopicWriterStream,
error,
) {
return c.rawClient.StreamWrite(ctx)
return c.rawClient.StreamWrite(ctx, tracer)
}

options := []topicoptions.WriterOption{
Expand Down
4 changes: 2 additions & 2 deletions internal/topic/topicwriterinternal/writer_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func (w *WriterReconnector) connectWithTimeout(streamLifetimeContext context.Con
}
}()

stream, err := w.cfg.Connect(connectCtx)
stream, err := w.cfg.Connect(connectCtx, w.cfg.Tracer)
resCh <- resT{stream: stream, err: err}
}()

Expand Down Expand Up @@ -789,7 +789,7 @@ func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, multiEncoder *Multi
return res
}

type ConnectFunc func(ctx context.Context) (RawTopicWriterStream, error)
type ConnectFunc func(ctx context.Context, tracer *trace.Topic) (RawTopicWriterStream, error)

func createPublicCodecsFromRaw(codecs rawtopiccommon.SupportedCodecs) []topictypes.Codec {
res := make([]topictypes.Codec, len(codecs))
Expand Down
26 changes: 14 additions & 12 deletions internal/topic/topicwriterinternal/writer_reconnector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

var testCommonEncoders = NewMultiEncoder()
Expand Down Expand Up @@ -446,7 +447,7 @@ func TestWriterImpl_Reconnect(t *testing.T) {
connectCalled := false
connectCalledChan := make(empty.Chan)

w.cfg.Connect = func(streamCtxArg context.Context) (RawTopicWriterStream, error) {
w.cfg.Connect = func(streamCtxArg context.Context, _ *trace.Topic) (RawTopicWriterStream, error) {
close(connectCalledChan)
connectCalled = true
require.NotEqual(t, ctx, streamCtxArg)
Expand Down Expand Up @@ -562,7 +563,7 @@ func TestWriterImpl_Reconnect(t *testing.T) {
}

var connectionAttempt atomic.Int64
w.cfg.Connect = func(ctx context.Context) (RawTopicWriterStream, error) {
w.cfg.Connect = func(ctx context.Context, _ *trace.Topic) (RawTopicWriterStream, error) {
attemptIndex := int(connectionAttempt.Add(1)) - 1
t.Logf("connect with attempt index: %v", attemptIndex)
res := connectsResult[attemptIndex]
Expand Down Expand Up @@ -1078,17 +1079,18 @@ func newTestEnv(t testing.TB, options *testEnvOptions) *testEnv {
partitionID: 14,
}

writerOptions := append(defaultTestWriterOptions(), WithConnectFunc(func(ctx context.Context) (
RawTopicWriterStream,
error,
) {
connectNum := atomic.AddInt64(&res.connectCount, 1)
if connectNum > 1 {
t.Fatalf("test: default env support most one connection")
}
writerOptions := append(defaultTestWriterOptions(), WithConnectFunc(
func(ctx context.Context, _ *trace.Topic) (
RawTopicWriterStream,
error,
) {
connectNum := atomic.AddInt64(&res.connectCount, 1)
if connectNum > 1 {
t.Fatalf("test: default env support most one connection")
}

return res.stream, nil
}))
return res.stream, nil
}))
writerOptions = append(writerOptions, options.writerOptions...)

res.writer = newWriterReconnectorStopped(NewWriterReconnectorConfig(writerOptions...))
Expand Down
72 changes: 72 additions & 0 deletions log/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"context"
"time"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/kv"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)
Expand Down Expand Up @@ -885,6 +889,39 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
kv.Version(),
)
}

t.OnWriterSentGRPCMessage = func(info trace.TopicWriterSentGRPCMessageInfo) {
if d.Details()&trace.TopicWriterStreamGrpcMessageEvents == 0 {
return
}

ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "grpc")
l.Log(
ctx, "topic writer sent grpc message (message body and metadata are removed)",
kv.String("topic_stream_internal_id", info.TopicStreamInternalID),
kv.String("session_id", info.SessionID),
kv.Int("message_number", info.MessageNumber),
kv.Stringer("message", lazyProtoStringifer{info.Message}),
kv.Error(info.Error),
kv.Version(),
)
}
t.OnWriterReceiveGRPCMessage = func(info trace.TopicWriterReceiveGRPCMessageInfo) {
if d.Details()&trace.TopicWriterStreamGrpcMessageEvents == 0 {
return
}

ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "grpc")
l.Log(
ctx, "topic writer received grpc message (message body and metadata are removed)",
kv.String("topic_stream_internal_id", info.TopicStreamInternalID),
kv.String("session_id", info.SessionID),
kv.Int("message_number", info.MessageNumber),
kv.Stringer("message", lazyProtoStringifer{info.Message}),
kv.Error(info.Error),
kv.Version(),
)
}
t.OnWriterReadUnknownGrpcMessage = func(info trace.TopicOnWriterReadUnknownGrpcMessageInfo) {
if d.Details()&trace.TopicWriterStreamEvents == 0 {
return
Expand All @@ -899,3 +936,38 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {

return t
}

type lazyProtoStringifer struct {
message proto.Message
}

func (s lazyProtoStringifer) String() string {
// cut message data
if writeRequest, ok := s.message.(*Ydb_Topic.StreamWriteMessage_FromClient); ok {
if data := writeRequest.GetWriteRequest(); data != nil {
type messDataType struct {
Data []byte
Metadata []*Ydb_Topic.MetadataItem
}
storage := make([]messDataType, len(data.GetMessages()))
for i := range data.GetMessages() {
storage[i].Data = data.GetMessages()[i].GetData()
data.Messages[i] = nil

storage[i].Metadata = data.GetMessages()[i].GetMetadataItems()
data.Messages[i].MetadataItems = nil
}

defer func() {
for i := range data.GetMessages() {
data.Messages[i].Data = storage[i].Data
data.Messages[i].MetadataItems = storage[i].Metadata
}
}()
}
}

res := protojson.MarshalOptions{AllowPartial: true}.Format(s.message)

return res
}
56 changes: 56 additions & 0 deletions tests/integration/topic_read_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,69 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/version"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
"github.com/ydb-platform/ydb-go-sdk/v3/log"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

func TestTopicWriterLogMessagesWithoutData(t *testing.T) {
scope := newScope(t)

producerID := "dkeujsl"
const seqNoInt = 486812497
seqNoString := "486812497"
data := "kdjwkruowe"
metaKey := "gyoeexiufo"
metaValue := "fjedeikeosbv"

logs := &strings.Builder{}
writer, err := scope.Driver().Topic().StartWriter(
scope.TopicPath(),
topicoptions.WithWriterProducerID(producerID),
topicoptions.WithWriterSetAutoSeqNo(false),
topicoptions.WithWriterWaitServerAck(true),
topicoptions.WithWriterTrace(log.Topic(
log.Default(logs, log.WithMinLevel(log.TRACE)), trace.TopicWriterStreamGrpcMessageEvents),
),
)

scope.Require.NoError(err)
err = writer.Write(scope.Ctx,
topicwriter.Message{
SeqNo: seqNoInt,
Data: strings.NewReader(data),
Metadata: map[string][]byte{
metaKey: []byte(metaValue),
},
},
)
scope.Require.NoError(err)

err = writer.Close(scope.Ctx)
scope.Require.NoError(err)

logsString := logs.String()
scope.Require.Contains(logsString, producerID)
scope.Require.Contains(logsString, seqNoString)
scope.Require.NotContains(logsString, metaKey)
scope.Require.NotContains(logsString, metaValue)
scope.Require.NotContains(logsString, data)

mess, err := scope.TopicReader().ReadMessage(scope.Ctx)
scope.Require.NoError(err)

scope.Require.Equal(producerID, mess.ProducerID)
scope.Require.Equal(int64(seqNoInt), mess.SeqNo)
scope.Require.Equal(metaValue, string(mess.Metadata[metaKey]))

messData, err := io.ReadAll(mess)
scope.Require.NoError(err)
scope.Require.Equal(data, string(messData))
}

func TestSendAsyncMessages(t *testing.T) {
ctx := context.Background()
db := connect(t)
Expand Down
Loading

0 comments on commit 68d4894

Please sign in to comment.