From 7ba622bf763b69fcb3fa7ca43bcbaea9adecc6b2 Mon Sep 17 00:00:00 2001 From: Joway Date: Tue, 20 Feb 2024 17:04:56 +0800 Subject: [PATCH] feat: lazy init pollers to avoid create any poller goroutines if netpoll is not used (#306) --- netpoll_options.go | 8 ++- poll_manager.go | 118 ++++++++++++++++++++++++++++--------------- poll_manager_test.go | 46 +++++++++++++++-- 3 files changed, 127 insertions(+), 45 deletions(-) diff --git a/netpoll_options.go b/netpoll_options.go index db752fe4..2cdb1c13 100644 --- a/netpoll_options.go +++ b/netpoll_options.go @@ -40,11 +40,17 @@ func SetNumLoops(numLoops int) error { // SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt // to distribute the incoming connections between multiple polls. -// This option only works when NumLoops is set. +// This option only works when numLoops is set. func SetLoadBalance(lb LoadBalance) error { return setLoadBalance(lb) } +// Initialize the pollers actively. By default, it's lazy initialized. +// It's safe to call it multi times. +func Initialize() { + initialize() +} + func SetLoggerOutput(w io.Writer) { setLoggerOutput(w) } diff --git a/poll_manager.go b/poll_manager.go index 119187c0..4183ac3d 100644 --- a/poll_manager.go +++ b/poll_manager.go @@ -23,6 +23,7 @@ import ( "log" "os" "runtime" + "sync/atomic" ) func setNumLoops(numLoops int) error { @@ -33,57 +34,55 @@ func setLoadBalance(lb LoadBalance) error { return pollmanager.SetLoadBalance(lb) } +func initialize() { + // The first call of Pick() will init pollers + _ = pollmanager.Pick() +} + func setLoggerOutput(w io.Writer) { logger = log.New(w, "", log.LstdFlags) } -// manage all pollers +// pollmanager manage all pollers var pollmanager *manager var logger *log.Logger func init() { - var loops = runtime.GOMAXPROCS(0)/20 + 1 - pollmanager = &manager{} - pollmanager.SetLoadBalance(RoundRobin) - pollmanager.SetNumLoops(loops) - + pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1) setLoggerOutput(os.Stderr) } +const ( + managerUninitialized = iota + managerInitializing + managerInitialized +) + +func newManager(numLoops int) *manager { + m := new(manager) + m.SetLoadBalance(RoundRobin) + m.SetNumLoops(numLoops) + return m +} + // LoadBalance is used to do load balancing among multiple pollers. // a single poller may not be optimal if the number of cores is large (40C+). type manager struct { - NumLoops int + numLoops int32 + status int32 // 0: uninitialized, 1: initializing, 2: initialized balance loadbalance // load balancing method polls []Poll // all the polls } // SetNumLoops will return error when set numLoops < 1 -func (m *manager) SetNumLoops(numLoops int) error { +func (m *manager) SetNumLoops(numLoops int) (err error) { if numLoops < 1 { return fmt.Errorf("set invalid numLoops[%d]", numLoops) } - - if numLoops < m.NumLoops { - // if less than, close the redundant pollers - var polls = make([]Poll, numLoops) - for idx := 0; idx < m.NumLoops; idx++ { - if idx < numLoops { - polls[idx] = m.polls[idx] - } else { - if err := m.polls[idx].Close(); err != nil { - logger.Printf("NETPOLL: poller close failed: %v\n", err) - } - } - } - m.NumLoops = numLoops - m.polls = polls - m.balance.Rebalance(m.polls) - return nil - } - - m.NumLoops = numLoops - return m.Run() + // note: set new numLoops first and then change the status + atomic.StoreInt32(&m.numLoops, int32(numLoops)) + atomic.StoreInt32(&m.status, managerUninitialized) + return nil } // SetLoadBalance set load balance. @@ -96,14 +95,14 @@ func (m *manager) SetLoadBalance(lb LoadBalance) error { } // Close release all resources. -func (m *manager) Close() error { +func (m *manager) Close() (err error) { for _, poll := range m.polls { - poll.Close() + err = poll.Close() } - m.NumLoops = 0 + m.numLoops = 0 m.balance = nil m.polls = nil - return nil + return err } // Run all pollers. @@ -114,16 +113,34 @@ func (m *manager) Run() (err error) { } }() - // new poll to fill delta. - for idx := len(m.polls); idx < m.NumLoops; idx++ { - var poll Poll - poll, err = openPoll() - if err != nil { - return + numLoops := int(atomic.LoadInt32(&m.numLoops)) + if numLoops == len(m.polls) { + return nil + } + var polls = make([]Poll, numLoops) + if numLoops < len(m.polls) { + // shrink polls + copy(polls, m.polls[:numLoops]) + for idx := numLoops; idx < len(m.polls); idx++ { + // close redundant polls + if err = m.polls[idx].Close(); err != nil { + logger.Printf("NETPOLL: poller close failed: %v\n", err) + } + } + } else { + // growth polls + copy(polls, m.polls) + for idx := len(m.polls); idx < numLoops; idx++ { + var poll Poll + poll, err = openPoll() + if err != nil { + return err + } + polls[idx] = poll + go poll.Wait() } - m.polls = append(m.polls, poll) - go poll.Wait() } + m.polls = polls // LoadBalance must be set before calling Run, otherwise it will panic. m.balance.Rebalance(m.polls) @@ -141,5 +158,24 @@ func (m *manager) Reset() error { // Pick will select the poller for use each time based on the LoadBalance. func (m *manager) Pick() Poll { +START: + // fast path + if atomic.LoadInt32(&m.status) == managerInitialized { + return m.balance.Pick() + } + // slow path + // try to get initializing lock failed, wait others finished the init work, and try again + if !atomic.CompareAndSwapInt32(&m.status, managerUninitialized, managerInitializing) { + runtime.Gosched() + goto START + } + // adjust polls + // m.Run() will finish very quickly, so will not many goroutines block on Pick. + _ = m.Run() + + if !atomic.CompareAndSwapInt32(&m.status, managerInitializing, managerInitialized) { + // SetNumLoops called during m.Run() which cause CAS failed + // The polls will be adjusted next Pick + } return m.balance.Pick() } diff --git a/poll_manager_test.go b/poll_manager_test.go index f79f3003..63559051 100644 --- a/poll_manager_test.go +++ b/poll_manager_test.go @@ -18,6 +18,8 @@ package netpoll import ( + "runtime" + "sync" "testing" "time" ) @@ -45,9 +47,47 @@ func TestPollManager(t *testing.T) { } func TestPollManagerReset(t *testing.T) { - n := pollmanager.NumLoops + n := pollmanager.numLoops err := pollmanager.Reset() MustNil(t, err) - Equal(t, len(pollmanager.polls), n) - Equal(t, pollmanager.NumLoops, n) + Equal(t, len(pollmanager.polls), int(n)) +} + +func TestPollManagerSetNumLoops(t *testing.T) { + pm := newManager(1) + + startGs := runtime.NumGoroutine() + poll := pm.Pick() + newGs := runtime.NumGoroutine() + Assert(t, poll != nil) + Assert(t, newGs-startGs == 1, newGs, startGs) + t.Logf("old=%d, new=%d", startGs, newGs) + + // change pollers + oldGs := newGs + err := pm.SetNumLoops(100) + MustNil(t, err) + newGs = runtime.NumGoroutine() + t.Logf("old=%d, new=%d", oldGs, newGs) + Assert(t, newGs == oldGs) + + // trigger polls adjustment + var wg sync.WaitGroup + finish := make(chan struct{}) + oldGs = startGs + 32 // 32 self goroutines + for i := 0; i < 32; i++ { + wg.Add(1) + go func() { + poll := pm.Pick() + newGs := runtime.NumGoroutine() + t.Logf("old=%d, new=%d", oldGs, newGs) + Assert(t, poll != nil) + Assert(t, newGs-oldGs == 100) + Assert(t, len(pm.polls) == 100) + wg.Done() + <-finish // hold goroutines + }() + } + wg.Wait() + close(finish) }