From e74cfefe738d9a6a2ee2ea0983c615754789da9a Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Wed, 25 Oct 2023 19:25:06 +0800 Subject: [PATCH] feat: use custom scheduler --- connection_onevent.go | 7 +- scheduler.go | 172 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+), 3 deletions(-) create mode 100644 scheduler.go diff --git a/connection_onevent.go b/connection_onevent.go index 9b87f01b..f21a949e 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -19,12 +19,13 @@ package netpoll import ( "context" + "runtime" "sync/atomic" - - "github.com/bytedance/gopkg/util/gopool" ) -var runTask = gopool.CtxGo +var scheduler = NewScheduler(runtime.GOMAXPROCS(0)) + +var runTask = scheduler.Go func disableGopool() error { runTask = func(ctx context.Context, f func()) { diff --git a/scheduler.go b/scheduler.go new file mode 100644 index 00000000..04b7f0ba --- /dev/null +++ b/scheduler.go @@ -0,0 +1,172 @@ +package netpoll + +import ( + "context" + "log" + "runtime" + "runtime/debug" + "sync" + "sync/atomic" +) + +type Task func() + +const defaultTaskBufferCap = 1024 * 64 + +type TaskBuffer struct { + lock sync.Mutex + head int64 + tail int64 + cap int64 + tasks []Task +} + +func NewTaskBuffer(cap ...int) *TaskBuffer { + var capacity int64 + if len(cap) > 0 { + capacity = int64(cap[0]) + } else { + capacity = defaultTaskBufferCap + } + return &TaskBuffer{ + cap: capacity, + tasks: make([]Task, capacity), + } +} + +func (s *TaskBuffer) Push(t Task) (size int64) { + s.lock.Lock() + s.tasks[s.tail] = t + s.tail = (s.tail + 1) % s.cap + size = s.tail - s.head + if size < 0 { + size = s.cap - s.head + s.tail + } + s.lock.Unlock() + return size +} + +func (s *TaskBuffer) Next(n int) (ts []Task) { + s.lock.Lock() + defer s.lock.Unlock() + + nn := int64(n) + if nn == 0 || nn > s.cap { + nn = s.cap + } + + head := s.head + tail := s.tail + size := tail - head + if size < 0 { + size = s.cap - head + tail + } + if nn > size { + nn = size + } + if head+nn < s.cap { + s.head = s.head + nn + return s.tasks[head:s.head] + } + ts = make([]Task, nn) + left := s.cap - head + copy(ts[:left], s.tasks[head:s.cap]) + copy(ts[left:], s.tasks[:nn-left]) + s.head = s.head + nn + return ts +} + +func (s *TaskBuffer) Pop() (t Task) { + s.lock.Lock() + defer s.lock.Unlock() + + if s.head == s.tail { + return nil + } + s.tail = (s.tail - 1) % s.cap + t = s.tasks[s.tail] + s.tasks[s.tail] = nil + return t +} + +type Processor struct { + id int + trigger chan struct{} + tasks *TaskBuffer +} + +var _IDGen int32 + +func NewProcessor(tasks *TaskBuffer) (p *Processor) { + trigger := make(chan struct{}) + id := atomic.AddInt32(&_IDGen, 1) - 1 + p = &Processor{ + id: int(id), + trigger: trigger, + tasks: tasks, + } + go p.harvest() + return p +} + +func (p *Processor) Wakeup() { + // internal call wakep() + select { + case p.trigger <- struct{}{}: + default: + } +} + +func (p *Processor) harvest() { + var ts []Task + var gone int + for { + ts = p.tasks.Next(10) + if len(ts) == 0 { + gone = 0 + <-p.trigger + continue + } + gone += len(ts) + for i := range ts { + runner := ts[i] + go runner() + ts[i] = nil + } + // release P to local Gs + if gone >= 20 { + runtime.Gosched() + } + //wakep() + } +} + +type Scheduler struct { + tasks *TaskBuffer + ps []*Processor + seed uint64 +} + +func NewScheduler(procs int) *Scheduler { + sche := new(Scheduler) + sche.tasks = NewTaskBuffer() + sche.ps = make([]*Processor, procs) + for p := 0; p < procs; p++ { + sche.ps[p] = NewProcessor(sche.tasks) + } + return sche +} + +func (s *Scheduler) Go(ctx context.Context, f func()) { + var t Task = func() { + defer func() { + if r := recover(); r != nil { + log.Printf("NETPOLL: panic in scheduler: %v: %s", r, debug.Stack()) + } + }() + f() + } + _ = s.tasks.Push(t) + // runs on other Ps (probably) + s.ps[atomic.AddUint64(&s.seed, 1)%uint64(len(s.ps))].Wakeup() +}