Skip to content

Commit

Permalink
enhance: optimize datanode cpu usage under large collection number (m…
Browse files Browse the repository at this point in the history
…ilvus-io#33267)

fix milvus-io#33266 
try to improve cpu usage by refactoring the ttchecker logic and caching
string

Signed-off-by: xiaofanluan <[email protected]>
  • Loading branch information
xiaofan-luan authored May 24, 2024
1 parent ed883b3 commit 36cbce4
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 83 deletions.
19 changes: 11 additions & 8 deletions internal/datanode/flow_graph_dd_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,9 @@ func (ddn *ddNode) IsValidInMsg(in []Msg) bool {

// Operate handles input messages, implementing flowgrpah.Node
func (ddn *ddNode) Operate(in []Msg) []Msg {
log := log.With(zap.String("channel", ddn.vChannelName))
msMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
log.Warn("type assertion failed for MsgStreamMsg", zap.String("channel", ddn.vChannelName), zap.String("name", reflect.TypeOf(in[0]).Name()))
return []Msg{}
}

Expand All @@ -110,12 +109,12 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
endPositions: msMsg.EndPositions(),
dropCollection: false,
}
log.Warn("MsgStream closed", zap.Any("ddNode node", ddn.Name()), zap.Int64("collection", ddn.collectionID))
log.Warn("MsgStream closed", zap.Any("ddNode node", ddn.Name()), zap.String("channel", ddn.vChannelName), zap.Int64("collection", ddn.collectionID))
return []Msg{&fgMsg}
}

if load := ddn.dropMode.Load(); load != nil && load.(bool) {
log.RatedInfo(1.0, "ddNode in dropMode")
log.RatedInfo(1.0, "ddNode in dropMode", zap.String("channel", ddn.vChannelName))
return []Msg{}
}

Expand Down Expand Up @@ -146,18 +145,18 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
switch msg.Type() {
case commonpb.MsgType_DropCollection:
if msg.(*msgstream.DropCollectionMsg).GetCollectionID() == ddn.collectionID {
log.Info("Receiving DropCollection msg")
log.Info("Receiving DropCollection msg", zap.String("channel", ddn.vChannelName))
ddn.dropMode.Store(true)

log.Info("Stop compaction for dropped channel")
log.Info("Stop compaction for dropped channel", zap.String("channel", ddn.vChannelName))
ddn.compactionExecutor.discardByDroppedChannel(ddn.vChannelName)
fgMsg.dropCollection = true
}

case commonpb.MsgType_DropPartition:
dpMsg := msg.(*msgstream.DropPartitionMsg)
if dpMsg.GetCollectionID() == ddn.collectionID {
log.Info("drop partition msg received", zap.Int64("partitionID", dpMsg.GetPartitionID()))
log.Info("drop partition msg received", zap.String("channel", ddn.vChannelName), zap.Int64("partitionID", dpMsg.GetPartitionID()))
fgMsg.dropPartitions = append(fgMsg.dropPartitions, dpMsg.PartitionID)
}

Expand All @@ -166,13 +165,15 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
if imsg.CollectionID != ddn.collectionID {
log.Warn("filter invalid insert message, collection mis-match",
zap.Int64("Get collID", imsg.CollectionID),
zap.String("channel", ddn.vChannelName),
zap.Int64("Expected collID", ddn.collectionID))
continue
}

if ddn.tryToFilterSegmentInsertMessages(imsg) {
log.Debug("filter insert messages",
zap.Int64("filter segmentID", imsg.GetSegmentID()),
zap.String("channel", ddn.vChannelName),
zap.Uint64("message timestamp", msg.EndTs()),
)
continue
Expand All @@ -194,6 +195,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {

log.Debug("DDNode receive insert messages",
zap.Int64("segmentID", imsg.GetSegmentID()),
zap.String("channel", ddn.vChannelName),
zap.Int("numRows", len(imsg.GetRowIDs())))
fgMsg.insertMessages = append(fgMsg.insertMessages, imsg)

Expand All @@ -203,11 +205,12 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
if dmsg.CollectionID != ddn.collectionID {
log.Warn("filter invalid DeleteMsg, collection mis-match",
zap.Int64("Get collID", dmsg.CollectionID),
zap.String("channel", ddn.vChannelName),
zap.Int64("Expected collID", ddn.collectionID))
continue
}

log.Debug("DDNode receive delete messages", zap.Int64("numRows", dmsg.NumRows))
log.Debug("DDNode receive delete messages", zap.String("channel", ddn.vChannelName), zap.Int64("numRows", dmsg.NumRows))
rateCol.Add(metricsinfo.DeleteConsumeThroughput, float64(proto.Size(&dmsg.DeleteRequest)))

metrics.DataNodeConsumeBytesCount.
Expand Down
3 changes: 1 addition & 2 deletions internal/datanode/timetick_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func (m *timeTickSender) cleanStatesCache(lastSentTss map[string]uint64) {
m.mu.Lock()
defer m.mu.Unlock()
sizeBeforeClean := len(m.statsCache)
log := log.With(zap.Any("lastSentTss", lastSentTss), zap.Int("sizeBeforeClean", sizeBeforeClean))
for channelName, lastSentTs := range lastSentTss {
_, ok := m.statsCache[channelName]
if ok {
Expand All @@ -162,7 +161,7 @@ func (m *timeTickSender) cleanStatesCache(lastSentTss map[string]uint64) {
delete(m.statsCache, channelName)
}
}
log.RatedDebug(30, "timeTickSender stats", zap.Int("sizeAfterClean", len(m.statsCache)))
log.RatedDebug(30, "timeTickSender stats", zap.Any("lastSentTss", lastSentTss), zap.Int("sizeBeforeClean", sizeBeforeClean), zap.Int("sizeAfterClean", len(m.statsCache)))
}

func (m *timeTickSender) sendReport(ctx context.Context) error {
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition {
switch {
case bufferCandidate == nil && syncCandidate == nil:
// all buffer are empty
log.RatedInfo(60, "checkpoint from latest consumed msg")
log.RatedDebug(60, "checkpoint from latest consumed msg")
return wb.checkpoint
case bufferCandidate == nil && syncCandidate != nil:
checkpoint = syncCandidate
Expand All @@ -260,7 +260,7 @@ func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition {
cpSource = "syncManager"
}

log.RatedInfo(20, "checkpoint evaluated",
log.RatedDebug(20, "checkpoint evaluated",
zap.String("cpSource", cpSource),
zap.Int64("segmentID", segmentID),
zap.Uint64("cpTimestamp", checkpoint.GetTimestamp()))
Expand Down
22 changes: 13 additions & 9 deletions internal/util/flowgraph/input_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ const (
// InputNode is the entry point of flowgragh
type InputNode struct {
BaseNode
input <-chan *msgstream.MsgPack
lastMsg *msgstream.MsgPack
name string
role string
nodeID int64
collectionID int64
dataType string
input <-chan *msgstream.MsgPack
lastMsg *msgstream.MsgPack
name string
role string
nodeID int64
nodeIDStr string
collectionID int64
collectionIDStr string
dataType string

closeGracefully *atomic.Bool

Expand Down Expand Up @@ -117,11 +119,11 @@ func (inNode *InputNode) Operate(in []Msg) []Msg {
sub := tsoutil.SubByNow(msgPack.EndTs)
if inNode.role == typeutil.DataNodeRole {
metrics.DataNodeConsumeMsgCount.
WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)).
WithLabelValues(inNode.nodeIDStr, inNode.dataType, inNode.collectionIDStr).
Inc()

metrics.DataNodeConsumeTimeTickLag.
WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)).
WithLabelValues(inNode.nodeIDStr, inNode.dataType, inNode.collectionIDStr).
Set(float64(sub))
}

Expand Down Expand Up @@ -192,7 +194,9 @@ func NewInputNode(input <-chan *msgstream.MsgPack, nodeName string, maxQueueLeng
name: nodeName,
role: role,
nodeID: nodeID,
nodeIDStr: fmt.Sprint(nodeID),
collectionID: collectionID,
collectionIDStr: fmt.Sprint(collectionID),
dataType: dataType,
closeGracefully: atomic.NewBool(CloseImmediately),
skipCount: 0,
Expand Down
10 changes: 5 additions & 5 deletions internal/util/flowgraph/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,16 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() {
inputNode := nodeCtxManager.inputNodeCtx
curNode := inputNode
// tt checker start
var checker *timerecord.GroupChecker
var checker *timerecord.Checker
if enableTtChecker {
checker = timerecord.GetGroupChecker("fgNode", nodeCtxTtInterval, func(list []string) {
manager := timerecord.GetCheckerManger("fgNode", nodeCtxTtInterval, func(list []string) {
log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", nodeCtxTtInterval))
})
for curNode != nil {
name := fmt.Sprintf("nodeCtxTtChecker-%s", curNode.node.Name())
checker.Check(name)
checker = timerecord.NewChecker(name, manager)
curNode = curNode.downstream
defer checker.Remove(name)
defer checker.Close()
}
}

Expand Down Expand Up @@ -138,7 +138,7 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() {
curNode.downstream.inputChannel <- output
}
if enableTtChecker {
checker.Check(fmt.Sprintf("nodeCtxTtChecker-%s", curNode.node.Name()))
checker.Check()
}
curNode = curNode.downstream
}
Expand Down
19 changes: 6 additions & 13 deletions internal/util/pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,20 @@ type Node interface {
Name() string
MaxQueueLength() int32
Operate(in Msg) Msg
Start()
Close()
}

type nodeCtx struct {
node Node
inputChannel chan Msg
next *nodeCtx
checker *timerecord.GroupChecker
InputChannel chan Msg

Next *nodeCtx
Checker *timerecord.Checker
}

func newNodeCtx(node Node) *nodeCtx {
func NewNodeCtx(node Node) *nodeCtx {
return &nodeCtx{
node: node,
inputChannel: make(chan Msg, node.MaxQueueLength()),
InputChannel: make(chan Msg, node.MaxQueueLength()),
}
}

Expand All @@ -57,12 +56,6 @@ func (node *BaseNode) MaxQueueLength() int32 {
return node.maxQueueLength
}

// Start implementing Node, base node does nothing when starts
func (node *BaseNode) Start() {}

// Close implementing Node, base node does nothing when stops
func (node *BaseNode) Close() {}

func NewBaseNode(name string, maxQueryLength int32) *BaseNode {
return &BaseNode{
name: name,
Expand Down
30 changes: 13 additions & 17 deletions internal/util/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ type pipeline struct {
inputChannel chan Msg
nodeTtInterval time.Duration
enableTtChecker bool

checkerNames map[string]string
}

func (p *pipeline) Add(nodes ...Node) {
Expand All @@ -48,21 +46,19 @@ func (p *pipeline) Add(nodes ...Node) {
}

func (p *pipeline) addNode(node Node) {
nodeCtx := newNodeCtx(node)
nodeCtx := NewNodeCtx(node)
if p.enableTtChecker {
nodeCtx.checker = timerecord.GetGroupChecker("fgNode", p.nodeTtInterval, func(list []string) {
manager := timerecord.GetCheckerManger("fgNode", p.nodeTtInterval, func(list []string) {
log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", p.nodeTtInterval))
})
if p.checkerNames == nil {
p.checkerNames = make(map[string]string)
}
p.checkerNames[nodeCtx.node.Name()] = fmt.Sprintf("nodeCtxTtChecker-%s", nodeCtx.node.Name())
name := fmt.Sprintf("nodeCtxTtChecker-%s", node.Name())
nodeCtx.Checker = timerecord.NewChecker(name, manager)
}

if len(p.nodes) != 0 {
p.nodes[len(p.nodes)-1].next = nodeCtx
p.nodes[len(p.nodes)-1].Next = nodeCtx
} else {
p.inputChannel = nodeCtx.inputChannel
p.inputChannel = nodeCtx.InputChannel
}

p.nodes = append(p.nodes, nodeCtx)
Expand All @@ -82,18 +78,18 @@ func (p *pipeline) process() {

curNode := p.nodes[0]
for curNode != nil {
if len(curNode.inputChannel) == 0 {
if len(curNode.InputChannel) == 0 {
break
}

input := <-curNode.inputChannel
input := <-curNode.InputChannel
output := curNode.node.Operate(input)
if _, ok := p.checkerNames[curNode.node.Name()]; ok {
curNode.checker.Check(p.checkerNames[curNode.node.Name()])
if curNode.Checker != nil {
curNode.Checker.Check()
}
if curNode.next != nil && output != nil {
curNode.next.inputChannel <- output
if curNode.Next != nil && output != nil {
curNode.Next.InputChannel <- output
}
curNode = curNode.next
curNode = curNode.Next
}
}
Loading

0 comments on commit 36cbce4

Please sign in to comment.