Skip to content

Commit

Permalink
feat: lazy init pollers to avoid create any poller goroutines if netp…
Browse files Browse the repository at this point in the history
…oll is not used (#306)
  • Loading branch information
joway authored Feb 20, 2024
1 parent b193834 commit 7ba622b
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 45 deletions.
8 changes: 7 additions & 1 deletion netpoll_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,17 @@ func SetNumLoops(numLoops int) error {

// SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt
// to distribute the incoming connections between multiple polls.
// This option only works when NumLoops is set.
// This option only works when numLoops is set.
func SetLoadBalance(lb LoadBalance) error {
return setLoadBalance(lb)
}

// Initialize the pollers actively. By default, it's lazy initialized.
// It's safe to call it multi times.
func Initialize() {
initialize()
}

func SetLoggerOutput(w io.Writer) {
setLoggerOutput(w)
}
Expand Down
118 changes: 77 additions & 41 deletions poll_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"log"
"os"
"runtime"
"sync/atomic"
)

func setNumLoops(numLoops int) error {
Expand All @@ -33,57 +34,55 @@ func setLoadBalance(lb LoadBalance) error {
return pollmanager.SetLoadBalance(lb)
}

func initialize() {
// The first call of Pick() will init pollers
_ = pollmanager.Pick()
}

func setLoggerOutput(w io.Writer) {
logger = log.New(w, "", log.LstdFlags)
}

// manage all pollers
// pollmanager manage all pollers
var pollmanager *manager
var logger *log.Logger

func init() {
var loops = runtime.GOMAXPROCS(0)/20 + 1
pollmanager = &manager{}
pollmanager.SetLoadBalance(RoundRobin)
pollmanager.SetNumLoops(loops)

pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1)
setLoggerOutput(os.Stderr)
}

const (
managerUninitialized = iota
managerInitializing
managerInitialized
)

func newManager(numLoops int) *manager {
m := new(manager)
m.SetLoadBalance(RoundRobin)
m.SetNumLoops(numLoops)
return m
}

// LoadBalance is used to do load balancing among multiple pollers.
// a single poller may not be optimal if the number of cores is large (40C+).
type manager struct {
NumLoops int
numLoops int32
status int32 // 0: uninitialized, 1: initializing, 2: initialized
balance loadbalance // load balancing method
polls []Poll // all the polls
}

// SetNumLoops will return error when set numLoops < 1
func (m *manager) SetNumLoops(numLoops int) error {
func (m *manager) SetNumLoops(numLoops int) (err error) {
if numLoops < 1 {
return fmt.Errorf("set invalid numLoops[%d]", numLoops)
}

if numLoops < m.NumLoops {
// if less than, close the redundant pollers
var polls = make([]Poll, numLoops)
for idx := 0; idx < m.NumLoops; idx++ {
if idx < numLoops {
polls[idx] = m.polls[idx]
} else {
if err := m.polls[idx].Close(); err != nil {
logger.Printf("NETPOLL: poller close failed: %v\n", err)
}
}
}
m.NumLoops = numLoops
m.polls = polls
m.balance.Rebalance(m.polls)
return nil
}

m.NumLoops = numLoops
return m.Run()
// note: set new numLoops first and then change the status
atomic.StoreInt32(&m.numLoops, int32(numLoops))
atomic.StoreInt32(&m.status, managerUninitialized)
return nil
}

// SetLoadBalance set load balance.
Expand All @@ -96,14 +95,14 @@ func (m *manager) SetLoadBalance(lb LoadBalance) error {
}

// Close release all resources.
func (m *manager) Close() error {
func (m *manager) Close() (err error) {
for _, poll := range m.polls {
poll.Close()
err = poll.Close()
}
m.NumLoops = 0
m.numLoops = 0
m.balance = nil
m.polls = nil
return nil
return err
}

// Run all pollers.
Expand All @@ -114,16 +113,34 @@ func (m *manager) Run() (err error) {
}
}()

// new poll to fill delta.
for idx := len(m.polls); idx < m.NumLoops; idx++ {
var poll Poll
poll, err = openPoll()
if err != nil {
return
numLoops := int(atomic.LoadInt32(&m.numLoops))
if numLoops == len(m.polls) {
return nil
}
var polls = make([]Poll, numLoops)
if numLoops < len(m.polls) {
// shrink polls
copy(polls, m.polls[:numLoops])
for idx := numLoops; idx < len(m.polls); idx++ {
// close redundant polls
if err = m.polls[idx].Close(); err != nil {
logger.Printf("NETPOLL: poller close failed: %v\n", err)
}
}
} else {
// growth polls
copy(polls, m.polls)
for idx := len(m.polls); idx < numLoops; idx++ {
var poll Poll
poll, err = openPoll()
if err != nil {
return err
}
polls[idx] = poll
go poll.Wait()
}
m.polls = append(m.polls, poll)
go poll.Wait()
}
m.polls = polls

// LoadBalance must be set before calling Run, otherwise it will panic.
m.balance.Rebalance(m.polls)
Expand All @@ -141,5 +158,24 @@ func (m *manager) Reset() error {

// Pick will select the poller for use each time based on the LoadBalance.
func (m *manager) Pick() Poll {
START:
// fast path
if atomic.LoadInt32(&m.status) == managerInitialized {
return m.balance.Pick()
}
// slow path
// try to get initializing lock failed, wait others finished the init work, and try again
if !atomic.CompareAndSwapInt32(&m.status, managerUninitialized, managerInitializing) {
runtime.Gosched()
goto START
}
// adjust polls
// m.Run() will finish very quickly, so will not many goroutines block on Pick.
_ = m.Run()

if !atomic.CompareAndSwapInt32(&m.status, managerInitializing, managerInitialized) {
// SetNumLoops called during m.Run() which cause CAS failed
// The polls will be adjusted next Pick
}
return m.balance.Pick()
}
46 changes: 43 additions & 3 deletions poll_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package netpoll

import (
"runtime"
"sync"
"testing"
"time"
)
Expand Down Expand Up @@ -45,9 +47,47 @@ func TestPollManager(t *testing.T) {
}

func TestPollManagerReset(t *testing.T) {
n := pollmanager.NumLoops
n := pollmanager.numLoops
err := pollmanager.Reset()
MustNil(t, err)
Equal(t, len(pollmanager.polls), n)
Equal(t, pollmanager.NumLoops, n)
Equal(t, len(pollmanager.polls), int(n))
}

func TestPollManagerSetNumLoops(t *testing.T) {
pm := newManager(1)

startGs := runtime.NumGoroutine()
poll := pm.Pick()
newGs := runtime.NumGoroutine()
Assert(t, poll != nil)
Assert(t, newGs-startGs == 1, newGs, startGs)
t.Logf("old=%d, new=%d", startGs, newGs)

// change pollers
oldGs := newGs
err := pm.SetNumLoops(100)
MustNil(t, err)
newGs = runtime.NumGoroutine()
t.Logf("old=%d, new=%d", oldGs, newGs)
Assert(t, newGs == oldGs)

// trigger polls adjustment
var wg sync.WaitGroup
finish := make(chan struct{})
oldGs = startGs + 32 // 32 self goroutines
for i := 0; i < 32; i++ {
wg.Add(1)
go func() {
poll := pm.Pick()
newGs := runtime.NumGoroutine()
t.Logf("old=%d, new=%d", oldGs, newGs)
Assert(t, poll != nil)
Assert(t, newGs-oldGs == 100)
Assert(t, len(pm.polls) == 100)
wg.Done()
<-finish // hold goroutines
}()
}
wg.Wait()
close(finish)
}

0 comments on commit 7ba622b

Please sign in to comment.