Skip to content

Commit

Permalink
fix: make qcv2 observer dispatcher execute exactly once (milvus-io#28472
Browse files Browse the repository at this point in the history
)

See also milvus-io#28466

In `taskDispatcher.schedule`, same task may be resubmitted if the
previous round did not finish
In this case, TaskObserver.check may set current target by mistake,
which may cause the random search/query failure

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Nov 16, 2023
1 parent bed7467 commit 81caf02
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 10 deletions.
26 changes: 16 additions & 10 deletions internal/querycoordv2/observers/task_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (

// taskDispatcher is the utility to provide task dedup and dispatch feature
type taskDispatcher[K comparable] struct {
tasks *typeutil.ConcurrentSet[K]
// tasks is the map for registered task
// key is the task identifier
// value is the flag stands for whether task is submitted to task pool
tasks *typeutil.ConcurrentMap[K, bool]
pool *conc.Pool[any]
notifyCh chan struct{}
taskRunner task[K]
Expand All @@ -40,7 +43,7 @@ type task[K comparable] func(context.Context, K)

func newTaskDispatcher[K comparable](runner task[K]) *taskDispatcher[K] {
return &taskDispatcher[K]{
tasks: typeutil.NewConcurrentSet[K](),
tasks: typeutil.NewConcurrentMap[K, bool](),
pool: conc.NewPool[any](paramtable.Get().QueryCoordCfg.ObserverTaskParallel.GetAsInt()),
notifyCh: make(chan struct{}, 1),
taskRunner: runner,
Expand Down Expand Up @@ -70,7 +73,8 @@ func (d *taskDispatcher[K]) Stop() {
func (d *taskDispatcher[K]) AddTask(keys ...K) {
var added bool
for _, key := range keys {
added = d.tasks.Insert(key) || added
_, loaded := d.tasks.GetOrInsert(key, false)
added = added || !loaded
}
if added {
d.notify()
Expand All @@ -90,13 +94,15 @@ func (d *taskDispatcher[K]) schedule(ctx context.Context) {
case <-ctx.Done():
return
case <-d.notifyCh:
d.tasks.Range(func(k K) bool {
d.tasks.Insert(k)
d.pool.Submit(func() (any, error) {
d.taskRunner(ctx, k)
d.tasks.Remove(k)
return struct{}{}, nil
})
d.tasks.Range(func(k K, submitted bool) bool {
if !submitted {
d.pool.Submit(func() (any, error) {
d.taskRunner(ctx, k)
d.tasks.Remove(k)
return struct{}{}, nil
})
d.tasks.Insert(k, true)
}
return true
})
}
Expand Down
46 changes: 46 additions & 0 deletions internal/querycoordv2/observers/task_dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package observers

import (
"context"
"sync"
"testing"
"time"

"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type taskDispatcherSuite struct {
suite.Suite
}

func (s *taskDispatcherSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
}

func (s *taskDispatcherSuite) TestMultipleSubmit() {
var wg sync.WaitGroup
wg.Add(6)

set := typeutil.NewConcurrentSet[int64]()
dispatcher := newTaskDispatcher(func(ctx context.Context, key int64) {
defer wg.Done()
set.Insert(key)
time.Sleep(time.Second)
})
dispatcher.Start()

dispatcher.AddTask(1, 2, 3, 4, 5)
dispatcher.AddTask(2, 3, 4, 5, 6)
wg.Wait()

s.ElementsMatch([]int64{1, 2, 3, 4, 5, 6}, set.Collect())

dispatcher.Stop()
}

func TestTaskDispatcher(t *testing.T) {
suite.Run(t, new(taskDispatcherSuite))
}

0 comments on commit 81caf02

Please sign in to comment.