diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index ea7d2e815db31..db78853f95074 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -91,10 +91,9 @@ func (ddn *ddNode) IsValidInMsg(in []Msg) bool { // Operate handles input messages, implementing flowgrpah.Node func (ddn *ddNode) Operate(in []Msg) []Msg { - log := log.With(zap.String("channel", ddn.vChannelName)) msMsg, ok := in[0].(*MsgStreamMsg) if !ok { - log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) + log.Warn("type assertion failed for MsgStreamMsg", zap.String("channel", ddn.vChannelName), zap.String("name", reflect.TypeOf(in[0]).Name())) return []Msg{} } @@ -110,12 +109,12 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { endPositions: msMsg.EndPositions(), dropCollection: false, } - log.Warn("MsgStream closed", zap.Any("ddNode node", ddn.Name()), zap.Int64("collection", ddn.collectionID)) + log.Warn("MsgStream closed", zap.Any("ddNode node", ddn.Name()), zap.String("channel", ddn.vChannelName), zap.Int64("collection", ddn.collectionID)) return []Msg{&fgMsg} } if load := ddn.dropMode.Load(); load != nil && load.(bool) { - log.RatedInfo(1.0, "ddNode in dropMode") + log.RatedInfo(1.0, "ddNode in dropMode", zap.String("channel", ddn.vChannelName)) return []Msg{} } @@ -146,10 +145,10 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { switch msg.Type() { case commonpb.MsgType_DropCollection: if msg.(*msgstream.DropCollectionMsg).GetCollectionID() == ddn.collectionID { - log.Info("Receiving DropCollection msg") + log.Info("Receiving DropCollection msg", zap.String("channel", ddn.vChannelName)) ddn.dropMode.Store(true) - log.Info("Stop compaction for dropped channel") + log.Info("Stop compaction for dropped channel", zap.String("channel", ddn.vChannelName)) ddn.compactionExecutor.discardByDroppedChannel(ddn.vChannelName) fgMsg.dropCollection = true } @@ -157,7 +156,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { case commonpb.MsgType_DropPartition: dpMsg := msg.(*msgstream.DropPartitionMsg) if dpMsg.GetCollectionID() == ddn.collectionID { - log.Info("drop partition msg received", zap.Int64("partitionID", dpMsg.GetPartitionID())) + log.Info("drop partition msg received", zap.String("channel", ddn.vChannelName), zap.Int64("partitionID", dpMsg.GetPartitionID())) fgMsg.dropPartitions = append(fgMsg.dropPartitions, dpMsg.PartitionID) } @@ -166,6 +165,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { if imsg.CollectionID != ddn.collectionID { log.Warn("filter invalid insert message, collection mis-match", zap.Int64("Get collID", imsg.CollectionID), + zap.String("channel", ddn.vChannelName), zap.Int64("Expected collID", ddn.collectionID)) continue } @@ -173,6 +173,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { if ddn.tryToFilterSegmentInsertMessages(imsg) { log.Debug("filter insert messages", zap.Int64("filter segmentID", imsg.GetSegmentID()), + zap.String("channel", ddn.vChannelName), zap.Uint64("message timestamp", msg.EndTs()), ) continue @@ -194,6 +195,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { log.Debug("DDNode receive insert messages", zap.Int64("segmentID", imsg.GetSegmentID()), + zap.String("channel", ddn.vChannelName), zap.Int("numRows", len(imsg.GetRowIDs()))) fgMsg.insertMessages = append(fgMsg.insertMessages, imsg) @@ -203,11 +205,12 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { if dmsg.CollectionID != ddn.collectionID { log.Warn("filter invalid DeleteMsg, collection mis-match", zap.Int64("Get collID", dmsg.CollectionID), + zap.String("channel", ddn.vChannelName), zap.Int64("Expected collID", ddn.collectionID)) continue } - log.Debug("DDNode receive delete messages", zap.Int64("numRows", dmsg.NumRows)) + log.Debug("DDNode receive delete messages", zap.String("channel", ddn.vChannelName), zap.Int64("numRows", dmsg.NumRows)) rateCol.Add(metricsinfo.DeleteConsumeThroughput, float64(proto.Size(&dmsg.DeleteRequest))) metrics.DataNodeConsumeBytesCount. diff --git a/internal/datanode/timetick_sender.go b/internal/datanode/timetick_sender.go index 145e60aec8cb8..ecce410c05501 100644 --- a/internal/datanode/timetick_sender.go +++ b/internal/datanode/timetick_sender.go @@ -148,7 +148,6 @@ func (m *timeTickSender) cleanStatesCache(lastSentTss map[string]uint64) { m.mu.Lock() defer m.mu.Unlock() sizeBeforeClean := len(m.statsCache) - log := log.With(zap.Any("lastSentTss", lastSentTss), zap.Int("sizeBeforeClean", sizeBeforeClean)) for channelName, lastSentTs := range lastSentTss { _, ok := m.statsCache[channelName] if ok { @@ -162,7 +161,7 @@ func (m *timeTickSender) cleanStatesCache(lastSentTss map[string]uint64) { delete(m.statsCache, channelName) } } - log.RatedDebug(30, "timeTickSender stats", zap.Int("sizeAfterClean", len(m.statsCache))) + log.RatedDebug(30, "timeTickSender stats", zap.Any("lastSentTss", lastSentTss), zap.Int("sizeBeforeClean", sizeBeforeClean), zap.Int("sizeAfterClean", len(m.statsCache))) } func (m *timeTickSender) sendReport(ctx context.Context) error { diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index 8456fb7ac2957..3dbd8df5ec7d7 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -240,7 +240,7 @@ func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition { switch { case bufferCandidate == nil && syncCandidate == nil: // all buffer are empty - log.RatedInfo(60, "checkpoint from latest consumed msg") + log.RatedDebug(60, "checkpoint from latest consumed msg") return wb.checkpoint case bufferCandidate == nil && syncCandidate != nil: checkpoint = syncCandidate @@ -260,7 +260,7 @@ func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition { cpSource = "syncManager" } - log.RatedInfo(20, "checkpoint evaluated", + log.RatedDebug(20, "checkpoint evaluated", zap.String("cpSource", cpSource), zap.Int64("segmentID", segmentID), zap.Uint64("cpTimestamp", checkpoint.GetTimestamp())) diff --git a/internal/util/flowgraph/input_node.go b/internal/util/flowgraph/input_node.go index 24eeff9b4e248..eed9850025639 100644 --- a/internal/util/flowgraph/input_node.go +++ b/internal/util/flowgraph/input_node.go @@ -43,13 +43,15 @@ const ( // InputNode is the entry point of flowgragh type InputNode struct { BaseNode - input <-chan *msgstream.MsgPack - lastMsg *msgstream.MsgPack - name string - role string - nodeID int64 - collectionID int64 - dataType string + input <-chan *msgstream.MsgPack + lastMsg *msgstream.MsgPack + name string + role string + nodeID int64 + nodeIDStr string + collectionID int64 + collectionIDStr string + dataType string closeGracefully *atomic.Bool @@ -117,11 +119,11 @@ func (inNode *InputNode) Operate(in []Msg) []Msg { sub := tsoutil.SubByNow(msgPack.EndTs) if inNode.role == typeutil.DataNodeRole { metrics.DataNodeConsumeMsgCount. - WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)). + WithLabelValues(inNode.nodeIDStr, inNode.dataType, inNode.collectionIDStr). Inc() metrics.DataNodeConsumeTimeTickLag. - WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)). + WithLabelValues(inNode.nodeIDStr, inNode.dataType, inNode.collectionIDStr). Set(float64(sub)) } @@ -192,7 +194,9 @@ func NewInputNode(input <-chan *msgstream.MsgPack, nodeName string, maxQueueLeng name: nodeName, role: role, nodeID: nodeID, + nodeIDStr: fmt.Sprint(nodeID), collectionID: collectionID, + collectionIDStr: fmt.Sprint(collectionID), dataType: dataType, closeGracefully: atomic.NewBool(CloseImmediately), skipCount: 0, diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index 0ae56f955efe2..f38a65aea4891 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -83,16 +83,16 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() { inputNode := nodeCtxManager.inputNodeCtx curNode := inputNode // tt checker start - var checker *timerecord.GroupChecker + var checker *timerecord.Checker if enableTtChecker { - checker = timerecord.GetGroupChecker("fgNode", nodeCtxTtInterval, func(list []string) { + manager := timerecord.GetCheckerManger("fgNode", nodeCtxTtInterval, func(list []string) { log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", nodeCtxTtInterval)) }) for curNode != nil { name := fmt.Sprintf("nodeCtxTtChecker-%s", curNode.node.Name()) - checker.Check(name) + checker = timerecord.NewChecker(name, manager) curNode = curNode.downstream - defer checker.Remove(name) + defer checker.Close() } } @@ -138,7 +138,7 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() { curNode.downstream.inputChannel <- output } if enableTtChecker { - checker.Check(fmt.Sprintf("nodeCtxTtChecker-%s", curNode.node.Name())) + checker.Check() } curNode = curNode.downstream } diff --git a/internal/util/pipeline/node.go b/internal/util/pipeline/node.go index fe16397dceabf..def0331794bd0 100644 --- a/internal/util/pipeline/node.go +++ b/internal/util/pipeline/node.go @@ -24,21 +24,20 @@ type Node interface { Name() string MaxQueueLength() int32 Operate(in Msg) Msg - Start() - Close() } type nodeCtx struct { node Node - inputChannel chan Msg - next *nodeCtx - checker *timerecord.GroupChecker + InputChannel chan Msg + + Next *nodeCtx + Checker *timerecord.Checker } -func newNodeCtx(node Node) *nodeCtx { +func NewNodeCtx(node Node) *nodeCtx { return &nodeCtx{ node: node, - inputChannel: make(chan Msg, node.MaxQueueLength()), + InputChannel: make(chan Msg, node.MaxQueueLength()), } } @@ -57,12 +56,6 @@ func (node *BaseNode) MaxQueueLength() int32 { return node.maxQueueLength } -// Start implementing Node, base node does nothing when starts -func (node *BaseNode) Start() {} - -// Close implementing Node, base node does nothing when stops -func (node *BaseNode) Close() {} - func NewBaseNode(name string, maxQueryLength int32) *BaseNode { return &BaseNode{ name: name, diff --git a/internal/util/pipeline/pipeline.go b/internal/util/pipeline/pipeline.go index 61212f4581992..cfc0db3e59832 100644 --- a/internal/util/pipeline/pipeline.go +++ b/internal/util/pipeline/pipeline.go @@ -37,8 +37,6 @@ type pipeline struct { inputChannel chan Msg nodeTtInterval time.Duration enableTtChecker bool - - checkerNames map[string]string } func (p *pipeline) Add(nodes ...Node) { @@ -48,21 +46,19 @@ func (p *pipeline) Add(nodes ...Node) { } func (p *pipeline) addNode(node Node) { - nodeCtx := newNodeCtx(node) + nodeCtx := NewNodeCtx(node) if p.enableTtChecker { - nodeCtx.checker = timerecord.GetGroupChecker("fgNode", p.nodeTtInterval, func(list []string) { + manager := timerecord.GetCheckerManger("fgNode", p.nodeTtInterval, func(list []string) { log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", p.nodeTtInterval)) }) - if p.checkerNames == nil { - p.checkerNames = make(map[string]string) - } - p.checkerNames[nodeCtx.node.Name()] = fmt.Sprintf("nodeCtxTtChecker-%s", nodeCtx.node.Name()) + name := fmt.Sprintf("nodeCtxTtChecker-%s", node.Name()) + nodeCtx.Checker = timerecord.NewChecker(name, manager) } if len(p.nodes) != 0 { - p.nodes[len(p.nodes)-1].next = nodeCtx + p.nodes[len(p.nodes)-1].Next = nodeCtx } else { - p.inputChannel = nodeCtx.inputChannel + p.inputChannel = nodeCtx.InputChannel } p.nodes = append(p.nodes, nodeCtx) @@ -82,18 +78,18 @@ func (p *pipeline) process() { curNode := p.nodes[0] for curNode != nil { - if len(curNode.inputChannel) == 0 { + if len(curNode.InputChannel) == 0 { break } - input := <-curNode.inputChannel + input := <-curNode.InputChannel output := curNode.node.Operate(input) - if _, ok := p.checkerNames[curNode.node.Name()]; ok { - curNode.checker.Check(p.checkerNames[curNode.node.Name()]) + if curNode.Checker != nil { + curNode.Checker.Check() } - if curNode.next != nil && output != nil { - curNode.next.inputChannel <- output + if curNode.Next != nil && output != nil { + curNode.Next.InputChannel <- output } - curNode = curNode.next + curNode = curNode.Next } } diff --git a/pkg/util/timerecord/group_checker.go b/pkg/util/timerecord/group_checker.go index d8502884d7938..c06dcd5ddeb9a 100644 --- a/pkg/util/timerecord/group_checker.go +++ b/pkg/util/timerecord/group_checker.go @@ -18,23 +18,47 @@ package timerecord import ( "sync" + "sync/atomic" "time" "github.com/milvus-io/milvus/pkg/util/typeutil" ) // groups maintains string to GroupChecker -var groups = typeutil.NewConcurrentMap[string, *GroupChecker]() +var groups = typeutil.NewConcurrentMap[string, *CheckerManager]() -// GroupChecker checks members in same group silent for certain period of time +type Checker struct { + name string + manager *CheckerManager + lastChecked atomic.Value +} + +func NewChecker(name string, manager *CheckerManager) *Checker { + checker := &Checker{} + checker.name = name + checker.manager = manager + checker.lastChecked.Store(time.Now()) + manager.Register(name, checker) + return checker +} + +func (checker *Checker) Check() { + checker.lastChecked.Store(time.Now()) +} + +func (checker *Checker) Close() { + checker.manager.Remove(checker.name) +} + +// CheckerManager checks members in same group silent for certain period of time // print warning msg if there are item(s) that not reported -type GroupChecker struct { +type CheckerManager struct { groupName string - d time.Duration // check duration - t *time.Ticker // internal ticker - ch chan struct{} // closing signal - lastest *typeutil.ConcurrentMap[string, time.Time] // map member name => lastest report time + d time.Duration // check duration + t *time.Ticker // internal ticker + ch chan struct{} // closing signal + checkers *typeutil.ConcurrentMap[string, *Checker] // map member name => checker initOnce sync.Once stopOnce sync.Once @@ -43,7 +67,7 @@ type GroupChecker struct { // init start worker goroutine // protected by initOnce -func (gc *GroupChecker) init() { +func (gc *CheckerManager) init() { gc.initOnce.Do(func() { gc.ch = make(chan struct{}) go gc.work() @@ -51,7 +75,7 @@ func (gc *GroupChecker) init() { } // work is the main procedure logic -func (gc *GroupChecker) work() { +func (gc *CheckerManager) work() { gc.t = time.NewTicker(gc.d) defer gc.t.Stop() @@ -63,8 +87,8 @@ func (gc *GroupChecker) work() { } var list []string - gc.lastest.Range(func(name string, ts time.Time) bool { - if time.Since(ts) > gc.d { + gc.checkers.Range(func(name string, checker *Checker) bool { + if time.Since(checker.lastChecked.Load().(time.Time)) > gc.d { list = append(list, name) } return true @@ -75,18 +99,17 @@ func (gc *GroupChecker) work() { } } -// Check updates the latest timestamp for provided name -func (gc *GroupChecker) Check(name string) { - gc.lastest.Insert(name, time.Now()) +func (gc *CheckerManager) Register(name string, checker *Checker) { + gc.checkers.Insert(name, checker) } // Remove deletes name from watch list -func (gc *GroupChecker) Remove(name string) { - gc.lastest.GetAndRemove(name) +func (gc *CheckerManager) Remove(name string) { + gc.checkers.GetAndRemove(name) } // Stop closes the GroupChecker -func (gc *GroupChecker) Stop() { +func (gc *CheckerManager) Stop() { gc.stopOnce.Do(func() { close(gc.ch) groups.GetAndRemove(gc.groupName) @@ -96,12 +119,12 @@ func (gc *GroupChecker) Stop() { // GetGroupChecker returns the GroupChecker with related group name // if no exist GroupChecker has the provided name, a new instance will be created with provided params // otherwise the params will be ignored -func GetGroupChecker(groupName string, duration time.Duration, fn func([]string)) *GroupChecker { - gc := &GroupChecker{ +func GetCheckerManger(groupName string, duration time.Duration, fn func([]string)) *CheckerManager { + gc := &CheckerManager{ groupName: groupName, d: duration, fn: fn, - lastest: typeutil.NewConcurrentMap[string, time.Time](), + checkers: typeutil.NewConcurrentMap[string, *Checker](), } gc, loaded := groups.GetOrInsert(groupName, gc) if !loaded { diff --git a/pkg/util/timerecord/group_checker_test.go b/pkg/util/timerecord/group_checker_test.go index 4d3d84b58f2b5..cef4521abb328 100644 --- a/pkg/util/timerecord/group_checker_test.go +++ b/pkg/util/timerecord/group_checker_test.go @@ -23,20 +23,24 @@ import ( "github.com/stretchr/testify/assert" ) -func TestGroupChecker(t *testing.T) { +func TestChecker(t *testing.T) { groupName := `test_group` signal := make(chan []string, 1) // 10ms period which set before is too short // change 10ms to 500ms to ensure the group checker schedule after the second value stored duration := 500 * time.Millisecond - gc1 := GetGroupChecker(groupName, duration, func(list []string) { + gc1 := GetCheckerManger(groupName, duration, func(list []string) { signal <- list }) - gc1.Check("1") - gc2 := GetGroupChecker(groupName, time.Second, func(list []string) { + + checker1 := NewChecker("1", gc1) + checker1.Check() + + gc2 := GetCheckerManger(groupName, time.Second, func(list []string) { t.FailNow() }) - gc2.Check("2") + checker2 := NewChecker("2", gc2) + checker2.Check() assert.Equal(t, duration, gc2.d) @@ -45,11 +49,12 @@ func TestGroupChecker(t *testing.T) { return len(list) == 2 }, duration*3, duration) - gc2.Remove("2") - + checker2.Close() list := <-signal assert.ElementsMatch(t, []string{"1"}, list) + checker1.Close() + assert.NotPanics(t, func() { gc1.Stop() gc2.Stop()