Skip to content

Commit

Permalink
Added log grpc messages metadata on trace log level for topic writer
Browse files Browse the repository at this point in the history
  • Loading branch information
rekby committed Feb 6, 2025
1 parent 7ec7b21 commit 569e38a
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 31 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added log grpc messages metadata on trace log level for topic writer

## v3.99.5
* Fixed error `Empty query text` using prepared statements and `ydb.WithExecuteDataQueryOverQueryClient(true)` option
* Prepared statements always send query text on Execute call from now (previous behaviour - send query ID)
Expand Down
13 changes: 11 additions & 2 deletions internal/grpcwrapper/rawtopic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package rawtopic
import (
"context"
"fmt"
"github.com/google/uuid"

Check failure on line 6 in internal/grpcwrapper/rawtopic/client.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/ydb-platform/ydb-go-sdk/v3) (gci)
"github.com/ydb-platform/ydb-go-sdk/v3/trace"

Check failure on line 8 in internal/grpcwrapper/rawtopic/client.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/ydb-platform/ydb-go-sdk/v3) (gci)
"github.com/ydb-platform/ydb-go-genproto/Ydb_Topic_V1"

Expand Down Expand Up @@ -99,7 +101,10 @@ func (c *Client) StreamRead(ctxStreamLifeTime context.Context) (rawtopicreader.S
return rawtopicreader.StreamReader{Stream: protoResp}, nil
}

func (c *Client) StreamWrite(ctxStreamLifeTime context.Context) (*rawtopicwriter.StreamWriter, error) {
func (c *Client) StreamWrite(
ctxStreamLifeTime context.Context,
tracer *trace.Topic,
) (*rawtopicwriter.StreamWriter, error) {
protoResp, err := c.service.StreamWrite(ctxStreamLifeTime)
if err != nil {
return nil, xerrors.WithStackTrace(
Expand All @@ -109,7 +114,11 @@ func (c *Client) StreamWrite(ctxStreamLifeTime context.Context) (*rawtopicwriter
)
}

return &rawtopicwriter.StreamWriter{Stream: protoResp}, nil
return &rawtopicwriter.StreamWriter{
Stream: protoResp,
Tracer: tracer,
InternalStreamID: uuid.New().String(),
}, nil
}

func (c *Client) UpdateOffsetsInTransaction(
Expand Down
12 changes: 12 additions & 0 deletions internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rawtopicwriter
import (
"errors"
"fmt"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"

Check failure on line 6 in internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/ydb-platform/ydb-go-sdk/v3) (gci)
"reflect"
"sync"
"sync/atomic"
Expand All @@ -29,6 +30,12 @@ type StreamWriter struct {

sendCloseMtx sync.Mutex
Stream GrpcStream

Tracer *trace.Topic
InternalStreamID string
readMessagesCount int
writtenMessagesCount int
sessionID string
}

func (w *StreamWriter) Recv() (ServerMessage, error) {
Expand All @@ -40,6 +47,8 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
}

grpcMsg, err := w.Stream.Recv()
w.readMessagesCount++
trace.TopicOnWriterReceiveGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, err)
if err != nil {
if !xerrors.IsErrorFromServer(err) {
err = xerrors.Transport(err)
Expand All @@ -64,6 +73,7 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
var res InitResult
res.ServerMessageMetadata = meta
res.mustFromProto(v.InitResponse)
w.sessionID = res.SessionID

return &res, nil
case *Ydb_Topic.StreamWriteMessage_FromServer_WriteResponse:
Expand Down Expand Up @@ -124,6 +134,8 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) {
}

err = w.Stream.Send(&protoMsg)
w.writtenMessagesCount++
trace.TopicOnWriterSentGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.writtenMessagesCount, &protoMsg, err)
if err != nil {
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed to send grpc message to writer stream: %w", err)))
}
Expand Down
4 changes: 2 additions & 2 deletions internal/topic/topicclientinternal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,11 @@ func (c *Client) createWriterConfig(
topicPath string,
opts []topicoptions.WriterOption,
) topicwriterinternal.WriterReconnectorConfig {
var connector topicwriterinternal.ConnectFunc = func(ctx context.Context) (
var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) (
topicwriterinternal.RawTopicWriterStream,
error,
) {
return c.rawClient.StreamWrite(ctx)
return c.rawClient.StreamWrite(ctx, tracer)
}

options := []topicoptions.WriterOption{
Expand Down
4 changes: 2 additions & 2 deletions internal/topic/topicwriterinternal/writer_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func (w *WriterReconnector) connectWithTimeout(streamLifetimeContext context.Con
}
}()

stream, err := w.cfg.Connect(connectCtx)
stream, err := w.cfg.Connect(connectCtx, w.cfg.Tracer)
resCh <- resT{stream: stream, err: err}
}()

Expand Down Expand Up @@ -789,7 +789,7 @@ func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, multiEncoder *Multi
return res
}

type ConnectFunc func(ctx context.Context) (RawTopicWriterStream, error)
type ConnectFunc func(ctx context.Context, tracer *trace.Topic) (RawTopicWriterStream, error)

func createPublicCodecsFromRaw(codecs rawtopiccommon.SupportedCodecs) []topictypes.Codec {
res := make([]topictypes.Codec, len(codecs))
Expand Down
26 changes: 14 additions & 12 deletions internal/topic/topicwriterinternal/writer_reconnector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"errors"
"fmt"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"

Check failure on line 9 in internal/topic/topicwriterinternal/writer_reconnector_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/ydb-platform/ydb-go-sdk/v3) (gci)
"io"
"math"
"sort"
Expand Down Expand Up @@ -446,7 +447,7 @@ func TestWriterImpl_Reconnect(t *testing.T) {
connectCalled := false
connectCalledChan := make(empty.Chan)

w.cfg.Connect = func(streamCtxArg context.Context) (RawTopicWriterStream, error) {
w.cfg.Connect = func(streamCtxArg context.Context, _ *trace.Topic) (RawTopicWriterStream, error) {
close(connectCalledChan)
connectCalled = true
require.NotEqual(t, ctx, streamCtxArg)
Expand Down Expand Up @@ -562,7 +563,7 @@ func TestWriterImpl_Reconnect(t *testing.T) {
}

var connectionAttempt atomic.Int64
w.cfg.Connect = func(ctx context.Context) (RawTopicWriterStream, error) {
w.cfg.Connect = func(ctx context.Context, _ *trace.Topic) (RawTopicWriterStream, error) {
attemptIndex := int(connectionAttempt.Add(1)) - 1
t.Logf("connect with attempt index: %v", attemptIndex)
res := connectsResult[attemptIndex]
Expand Down Expand Up @@ -1078,17 +1079,18 @@ func newTestEnv(t testing.TB, options *testEnvOptions) *testEnv {
partitionID: 14,
}

writerOptions := append(defaultTestWriterOptions(), WithConnectFunc(func(ctx context.Context) (
RawTopicWriterStream,
error,
) {
connectNum := atomic.AddInt64(&res.connectCount, 1)
if connectNum > 1 {
t.Fatalf("test: default env support most one connection")
}
writerOptions := append(defaultTestWriterOptions(), WithConnectFunc(
func(ctx context.Context, _ *trace.Topic) (
RawTopicWriterStream,
error,
) {
connectNum := atomic.AddInt64(&res.connectCount, 1)
if connectNum > 1 {
t.Fatalf("test: default env support most one connection")
}

return res.stream, nil
}))
return res.stream, nil
}))
writerOptions = append(writerOptions, options.writerOptions...)

res.writer = newWriterReconnectorStopped(NewWriterReconnectorConfig(writerOptions...))
Expand Down
70 changes: 70 additions & 0 deletions log/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package log

import (
"context"

Check failure on line 4 in log/topic.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/ydb-platform/ydb-go-sdk/v3) (gci)
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"time"

Check failure on line 8 in log/topic.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/ydb-platform/ydb-go-sdk/v3) (gci)

"github.com/ydb-platform/ydb-go-sdk/v3/internal/kv"
Expand Down Expand Up @@ -885,6 +888,39 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
kv.Version(),
)
}

t.OnWriterSentGRPCMessage = func(info trace.TopicWriterSentGRPCMessageInfo) {
if d.Details()&trace.TopicWriterStreamGrpcMessageEvents == 0 {
return
}

ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "grpc")
l.Log(
ctx, "topic writer sent grpc message (message body and metadata are removed)",
kv.String("topic_stream_internal_id", info.TopicStreamInternalID),
kv.String("session_id", info.SessionID),
kv.Int("message_number", info.MessageNumber),
kv.Stringer("message", lazyProtoStringifer{info.Message}),
kv.Error(info.Error),
kv.Version(),
)
}
t.OnWriterReceiveGRPCMessage = func(info trace.TopicWriterReceiveGRPCMessageInfo) {
if d.Details()&trace.TopicWriterStreamGrpcMessageEvents == 0 {
return
}

ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "grpc")
l.Log(
ctx, "topic writer received grpc message (message body and metadata are removed)",
kv.String("topic_stream_internal_id", info.TopicStreamInternalID),
kv.String("session_id", info.SessionID),
kv.Int("message_number", info.MessageNumber),
kv.Stringer("message", lazyProtoStringifer{info.Message}),
kv.Error(info.Error),
kv.Version(),
)
}
t.OnWriterReadUnknownGrpcMessage = func(info trace.TopicOnWriterReadUnknownGrpcMessageInfo) {
if d.Details()&trace.TopicWriterStreamEvents == 0 {
return
Expand All @@ -899,3 +935,37 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {

return t
}

type lazyProtoStringifer struct {
message proto.Message
}

func (s lazyProtoStringifer) String() string {
// cut message data
if writeRequest, ok := s.message.(*Ydb_Topic.StreamWriteMessage_FromClient); ok {
if data := writeRequest.GetWriteRequest(); data != nil {
type messDataType struct {
Data []byte
Metadata []*Ydb_Topic.MetadataItem
}
storage := make([]messDataType, len(data.Messages))
for i := range data.Messages {
storage[i].Data = data.Messages[i].Data
data.Messages[i] = nil

storage[i].Metadata = data.Messages[i].MetadataItems
data.Messages[i].MetadataItems = nil
}

defer func() {
for i := range data.Messages {
data.Messages[i].Data = storage[i].Data
data.Messages[i].MetadataItems = storage[i].Metadata
}
}()
}
}

res := protojson.MarshalOptions{AllowPartial: true}.Format(s.message)
return res
}
56 changes: 56 additions & 0 deletions tests/integration/topic_read_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"encoding/binary"
"errors"
"github.com/ydb-platform/ydb-go-sdk/v3/log"
"io"
"os"
"runtime/pprof"
Expand All @@ -33,6 +34,61 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

func TestTopicWriterLogMessagesWithoutData(t *testing.T) {
scope := newScope(t)

producerID := "dkeujsl"
const seqNoInt = 486812497
seqNoString := "486812497"
data := "kdjwkruowe"
metaKey := "gyoeexiufo"
metaValue := "fjedeikeosbv"

logs := &strings.Builder{}
writer, err := scope.Driver().Topic().StartWriter(
scope.TopicPath(),
topicoptions.WithWriterProducerID(producerID),
topicoptions.WithWriterSetAutoSeqNo(false),
topicoptions.WithWriterWaitServerAck(true),
topicoptions.WithWriterTrace(log.Topic(
log.Default(logs, log.WithMinLevel(log.TRACE)), trace.TopicWriterStreamGrpcMessageEvents),
),
)

scope.Require.NoError(err)
err = writer.Write(scope.Ctx,
topicwriter.Message{
SeqNo: seqNoInt,
Data: strings.NewReader(data),
Metadata: map[string][]byte{
metaKey: []byte(metaValue),
},
},
)
scope.Require.NoError(err)

err = writer.Close(scope.Ctx)
scope.Require.NoError(err)

logsString := logs.String()
scope.Require.Contains(logsString, producerID)
scope.Require.Contains(logsString, seqNoString)
scope.Require.NotContains(logsString, metaKey)
scope.Require.NotContains(logsString, metaValue)
scope.Require.NotContains(logsString, data)

mess, err := scope.TopicReader().ReadMessage(scope.Ctx)
scope.Require.NoError(err)

scope.Require.Equal(producerID, mess.ProducerID)
scope.Require.Equal(int64(seqNoInt), mess.SeqNo)
scope.Require.Equal(metaValue, string(mess.Metadata[metaKey]))

messData, err := io.ReadAll(mess)
scope.Require.NoError(err)
scope.Require.Equal(data, string(messData))
}

func TestSendAsyncMessages(t *testing.T) {
ctx := context.Background()
db := connect(t)
Expand Down
25 changes: 15 additions & 10 deletions trace/details.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const (

TopicWriterStreamLifeCycleEvents
TopicWriterStreamEvents
TopicWriterStreamGrpcMessageEvents

DatabaseSQLConnectorEvents
DatabaseSQLConnEvents
Expand Down Expand Up @@ -116,7 +117,10 @@ const (
TopicReaderPartitionEvents |
TopicReaderStreamLifeCycleEvents

TopicEvents = TopicControlPlaneEvents | TopicReaderEvents
TopicWriterEvents = TopicWriterStreamLifeCycleEvents | TopicWriterStreamEvents |
TopicWriterStreamGrpcMessageEvents

TopicEvents = TopicControlPlaneEvents | TopicReaderEvents | TopicWriterEvents

DatabaseSQLEvents = DatabaseSQLConnectorEvents |
DatabaseSQLConnEvents |
Expand Down Expand Up @@ -169,15 +173,16 @@ var (
DatabaseSQLTxEvents: "ydb.database.sql.tx",
DatabaseSQLStmtEvents: "ydb.database.sql.stmt",

TopicEvents: "ydb.topic",
TopicControlPlaneEvents: "ydb.topic.controlplane",
TopicReaderEvents: "ydb.topic.reader",
TopicReaderStreamEvents: "ydb.topic.reader.stream",
TopicReaderMessageEvents: "ydb.topic.reader.message",
TopicReaderPartitionEvents: "ydb.topic.reader.partition",
TopicReaderStreamLifeCycleEvents: "ydb.topic.reader.lifecycle",
TopicWriterStreamLifeCycleEvents: "ydb.topic.writer.lifecycle",
TopicWriterStreamEvents: "ydb.topic.writer.stream",
TopicEvents: "ydb.topic",
TopicControlPlaneEvents: "ydb.topic.controlplane",
TopicReaderEvents: "ydb.topic.reader",
TopicReaderStreamEvents: "ydb.topic.reader.stream",
TopicReaderMessageEvents: "ydb.topic.reader.message",
TopicReaderPartitionEvents: "ydb.topic.reader.partition",
TopicReaderStreamLifeCycleEvents: "ydb.topic.reader.lifecycle",
TopicWriterStreamLifeCycleEvents: "ydb.topic.writer.lifecycle",
TopicWriterStreamEvents: "ydb.topic.writer.stream",
TopicWriterStreamGrpcMessageEvents: "ydb.topic.writer.grpc",
}
defaultDetails = DetailsAll
)
Expand Down
Loading

0 comments on commit 569e38a

Please sign in to comment.