Skip to content

Commit

Permalink
feat: use custom scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Oct 25, 2023
1 parent 832b64d commit e74cfef
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 3 deletions.
7 changes: 4 additions & 3 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package netpoll

import (
"context"
"runtime"
"sync/atomic"

"github.com/bytedance/gopkg/util/gopool"
)

var runTask = gopool.CtxGo
var scheduler = NewScheduler(runtime.GOMAXPROCS(0))

var runTask = scheduler.Go

func disableGopool() error {
runTask = func(ctx context.Context, f func()) {
Expand Down
172 changes: 172 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package netpoll

import (
"context"
"log"
"runtime"
"runtime/debug"
"sync"
"sync/atomic"
)

type Task func()

const defaultTaskBufferCap = 1024 * 64

type TaskBuffer struct {
lock sync.Mutex
head int64
tail int64
cap int64
tasks []Task
}

func NewTaskBuffer(cap ...int) *TaskBuffer {
var capacity int64
if len(cap) > 0 {
capacity = int64(cap[0])
} else {
capacity = defaultTaskBufferCap
}
return &TaskBuffer{
cap: capacity,
tasks: make([]Task, capacity),
}
}

func (s *TaskBuffer) Push(t Task) (size int64) {
s.lock.Lock()
s.tasks[s.tail] = t
s.tail = (s.tail + 1) % s.cap
size = s.tail - s.head
if size < 0 {
size = s.cap - s.head + s.tail
}
s.lock.Unlock()
return size
}

func (s *TaskBuffer) Next(n int) (ts []Task) {
s.lock.Lock()
defer s.lock.Unlock()

nn := int64(n)
if nn == 0 || nn > s.cap {
nn = s.cap
}

head := s.head
tail := s.tail
size := tail - head
if size < 0 {
size = s.cap - head + tail
}
if nn > size {
nn = size
}
if head+nn < s.cap {
s.head = s.head + nn
return s.tasks[head:s.head]
}
ts = make([]Task, nn)
left := s.cap - head
copy(ts[:left], s.tasks[head:s.cap])
copy(ts[left:], s.tasks[:nn-left])
s.head = s.head + nn
return ts
}

func (s *TaskBuffer) Pop() (t Task) {
s.lock.Lock()
defer s.lock.Unlock()

if s.head == s.tail {
return nil
}
s.tail = (s.tail - 1) % s.cap
t = s.tasks[s.tail]
s.tasks[s.tail] = nil
return t
}

type Processor struct {
id int
trigger chan struct{}
tasks *TaskBuffer
}

var _IDGen int32

func NewProcessor(tasks *TaskBuffer) (p *Processor) {
trigger := make(chan struct{})
id := atomic.AddInt32(&_IDGen, 1) - 1
p = &Processor{
id: int(id),
trigger: trigger,
tasks: tasks,
}
go p.harvest()
return p
}

func (p *Processor) Wakeup() {
// internal call wakep()
select {
case p.trigger <- struct{}{}:
default:
}
}

func (p *Processor) harvest() {
var ts []Task
var gone int
for {
ts = p.tasks.Next(10)
if len(ts) == 0 {
gone = 0
<-p.trigger
continue
}
gone += len(ts)
for i := range ts {
runner := ts[i]
go runner()
ts[i] = nil
}
// release P to local Gs
if gone >= 20 {
runtime.Gosched()
}
//wakep()
}
}

type Scheduler struct {
tasks *TaskBuffer
ps []*Processor
seed uint64
}

func NewScheduler(procs int) *Scheduler {
sche := new(Scheduler)
sche.tasks = NewTaskBuffer()
sche.ps = make([]*Processor, procs)
for p := 0; p < procs; p++ {
sche.ps[p] = NewProcessor(sche.tasks)
}
return sche
}

func (s *Scheduler) Go(ctx context.Context, f func()) {
var t Task = func() {
defer func() {
if r := recover(); r != nil {
log.Printf("NETPOLL: panic in scheduler: %v: %s", r, debug.Stack())
}
}()
f()
}
_ = s.tasks.Push(t)
// runs on other Ps (probably)
s.ps[atomic.AddUint64(&s.seed, 1)%uint64(len(s.ps))].Wakeup()
}

0 comments on commit e74cfef

Please sign in to comment.