diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index f05c232904f..b9294ad970d 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -324,14 +324,16 @@ func (oc *Controller) AddWaitingOperator(ops ...*Operator) int { } continue } - oc.wop.PutOperator(op) + if isMerge { // count two merge operators as one, so wopStatus.ops[desc] should // not be updated here // TODO: call checkAddOperator ... + oc.wop.PutMergeOperators([]*Operator{op, ops[i+1]}) i++ added++ - oc.wop.PutOperator(ops[i]) + } else { + oc.wop.PutOperator(op) } operatorCounter.WithLabelValues(desc, "put").Inc() oc.wopStatus.incCount(desc) diff --git a/pkg/schedule/operator/waiting_operator.go b/pkg/schedule/operator/waiting_operator.go index b3b1b885663..f75dcf25cd8 100644 --- a/pkg/schedule/operator/waiting_operator.go +++ b/pkg/schedule/operator/waiting_operator.go @@ -26,6 +26,7 @@ var priorityWeight = []float64{1.0, 4.0, 9.0, 16.0} // WaitingOperator is an interface of waiting operators. type WaitingOperator interface { PutOperator(op *Operator) + PutMergeOperators(op []*Operator) GetOperator() []*Operator ListOperator() []*Operator } @@ -66,6 +67,21 @@ func (b *randBuckets) PutOperator(op *Operator) { bucket.ops = append(bucket.ops, op) } +// PutMergeOperators puts two operators into the random buckets. +func (b *randBuckets) PutMergeOperators(ops []*Operator) { + b.mu.Lock() + defer b.mu.Unlock() + if len(ops) != 2 && (ops[0].Kind()&OpMerge == 0 || ops[1].Kind()&OpMerge == 0) { + return + } + priority := ops[0].GetPriorityLevel() + bucket := b.buckets[priority] + if len(bucket.ops) == 0 { + b.totalWeight += bucket.weight + } + bucket.ops = append(bucket.ops, ops...) +} + // ListOperator lists all operator in the random buckets. func (b *randBuckets) ListOperator() []*Operator { b.mu.Lock()