diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index bff66b1b..c9e4e947 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -39,10 +39,10 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v2 + uses: actions/setup-go@v5 with: go-version: "1.20" diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml index a923b418..ba4c9262 100644 --- a/.github/workflows/pr-check.yml +++ b/.github/workflows/pr-check.yml @@ -10,9 +10,9 @@ jobs: os: [ X64, ARM64 ] runs-on: ${{ matrix.os }} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 with: go-version: ${{ matrix.go }} # - uses: actions/cache@v2 @@ -28,9 +28,9 @@ jobs: windows-test: runs-on: windows-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 with: go-version: "1.20" # - uses: actions/cache@v2 @@ -44,9 +44,9 @@ jobs: style-test: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 with: go-version: 1.16 - name: Check License Header diff --git a/.github/workflows/release-check.yml b/.github/workflows/release-check.yml index 7c8f5fea..0e75ca1e 100644 --- a/.github/workflows/release-check.yml +++ b/.github/workflows/release-check.yml @@ -9,7 +9,7 @@ jobs: build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Check Source Branch run: python2 -c "exit(0 if '${{ github.head_ref }}'.startswith('release') or '${{ github.head_ref }}'.startswith('hotfix') else 1)" diff --git a/connection.go b/connection.go index 515cbc60..d02f683a 100644 --- a/connection.go +++ b/connection.go @@ -19,7 +19,7 @@ import ( "time" ) -// CloseCallback will be called when the connection is closed. +// CloseCallback will be called after the connection is closed. // Return: error is unused which will be ignored directly. type CloseCallback func(connection Connection) error diff --git a/connection_errors.go b/connection_errors.go index b08ba668..1edfa21d 100644 --- a/connection_errors.go +++ b/connection_errors.go @@ -16,6 +16,7 @@ package netpoll import ( "fmt" + "net" "syscall" ) @@ -51,6 +52,10 @@ func Exception(err error, suffix string) error { return &exception{no: no, suffix: suffix} } +var ( + _ net.Error = (*exception)(nil) +) + type exception struct { no syscall.Errno suffix string @@ -88,6 +93,21 @@ func (e *exception) Unwrap() error { return e.no } +func (e *exception) Timeout() bool { + switch e.no { + case ErrDialTimeout, ErrReadTimeout, ErrWriteTimeout: + return true + } + if e.no.Timeout() { + return true + } + return false +} + +func (e *exception) Temporary() bool { + return e.no.Temporary() +} + // Errors defined in netpoll var errnos = [...]string{ ErrnoMask & ErrConnClosed: "connection has been closed", diff --git a/connection_impl.go b/connection_impl.go index eb99f1c1..b683b4df 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -45,8 +45,9 @@ type connection struct { outputBuffer *LinkBuffer outputBarrier *barrier supportZeroCopy bool - maxSize int // The maximum size of data between two Release(). - bookSize int // The size of data that can be read at once. + maxSize int // The maximum size of data between two Release(). + bookSize int // The size of data that can be read at once. + state int32 // 0: not connected, 1: connected, 2: disconnected. Connection state should be changed sequentially. } var ( @@ -323,6 +324,7 @@ func (c *connection) init(conn Conn, opts *options) (err error) { c.bookSize, c.maxSize = pagesize, pagesize c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer() c.outputBarrier = barrierPool.Get().(*barrier) + c.state = 0 c.initNetFD(conn) // conn must be *netFD{} c.initFDOperator() @@ -447,7 +449,7 @@ func (c *connection) waitReadWithTimeout(n int) (err error) { return Exception(ErrReadTimeout, c.remoteAddr.String()) case err = <-c.readTrigger: if err != nil { - return err + goto RET } continue } diff --git a/connection_lock.go b/connection_lock.go index 4b0f7360..b6e6fa3c 100644 --- a/connection_lock.go +++ b/connection_lock.go @@ -45,6 +45,7 @@ type key int32 const ( closing key = iota + connecting processing flushing // total must be at the bottom. diff --git a/connection_onevent.go b/connection_onevent.go index 9b87f01b..35b7c001 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -26,6 +26,10 @@ import ( var runTask = gopool.CtxGo +func setRunner(runner func(ctx context.Context, f func())) { + runTask = runner +} + func disableGopool() error { runTask = func(ctx context.Context, f func()) { go f() @@ -44,10 +48,11 @@ type gracefulExit interface { // OnPrepare, OnRequest, CloseCallback share the lock processing, // which is a CAS lock and can only be cleared by OnRequest. type onEvent struct { - ctx context.Context - onConnectCallback atomic.Value - onRequestCallback atomic.Value - closeCallbacks atomic.Value // value is latest *callbackNode + ctx context.Context + onConnectCallback atomic.Value + onDisconnectCallback atomic.Value + onRequestCallback atomic.Value + closeCallbacks atomic.Value // value is latest *callbackNode } type callbackNode struct { @@ -63,6 +68,14 @@ func (c *connection) SetOnConnect(onConnect OnConnect) error { return nil } +// SetOnDisconnect set the OnDisconnect callback. +func (c *connection) SetOnDisconnect(onDisconnect OnDisconnect) error { + if onDisconnect != nil { + c.onDisconnectCallback.Store(onDisconnect) + } + return nil +} + // SetOnRequest initialize ctx when setting OnRequest. func (c *connection) SetOnRequest(onRequest OnRequest) error { if onRequest == nil { @@ -95,6 +108,7 @@ func (c *connection) AddCloseCallback(callback CloseCallback) error { func (c *connection) onPrepare(opts *options) (err error) { if opts != nil { c.SetOnConnect(opts.onConnect) + c.SetOnDisconnect(opts.onDisconnect) c.SetOnRequest(opts.onRequest) c.SetReadTimeout(opts.readTimeout) c.SetWriteTimeout(opts.writeTimeout) @@ -120,23 +134,36 @@ func (c *connection) onPrepare(opts *options) (err error) { func (c *connection) onConnect() { var onConnect, _ = c.onConnectCallback.Load().(OnConnect) if onConnect == nil { + atomic.StoreInt32(&c.state, 1) + return + } + if !c.lock(connecting) { + // it never happens because onDisconnect will not lock connecting if c.connected == 0 return } var onRequest, _ = c.onRequestCallback.Load().(OnRequest) - var connected int32 c.onProcess( // only process when conn active and have unread data func(c *connection) bool { // if onConnect not called - if atomic.LoadInt32(&connected) == 0 { + if atomic.LoadInt32(&c.state) == 0 { return true } // check for onRequest return onRequest != nil && c.Reader().Len() > 0 }, func(c *connection) { - if atomic.CompareAndSwapInt32(&connected, 0, 1) { + if atomic.CompareAndSwapInt32(&c.state, 0, 1) { c.ctx = onConnect(c.ctx, c) + + if !c.IsActive() && atomic.CompareAndSwapInt32(&c.state, 1, 2) { + // since we hold connecting lock, so we should help to call onDisconnect here + var onDisconnect, _ = c.onDisconnectCallback.Load().(OnDisconnect) + if onDisconnect != nil { + onDisconnect(c.ctx, c) + } + } + c.unlock(connecting) return } if onRequest != nil { @@ -146,12 +173,44 @@ func (c *connection) onConnect() { ) } +// when onDisconnect called, c.IsActive() must return false +func (c *connection) onDisconnect() { + var onDisconnect, _ = c.onDisconnectCallback.Load().(OnDisconnect) + if onDisconnect == nil { + return + } + var onConnect, _ = c.onConnectCallback.Load().(OnConnect) + if onConnect == nil { + // no need lock if onConnect is nil + atomic.StoreInt32(&c.state, 2) + onDisconnect(c.ctx, c) + return + } + // check if OnConnect finished when onConnect != nil && onDisconnect != nil + if atomic.LoadInt32(&c.state) > 0 && c.lock(connecting) { // means OnConnect already finished + // protect onDisconnect run once + // if CAS return false, means OnConnect already helps to run onDisconnect + if atomic.CompareAndSwapInt32(&c.state, 1, 2) { + onDisconnect(c.ctx, c) + } + c.unlock(connecting) + return + } + // OnConnect is not finished yet, return and let onConnect helps to call onDisconnect + return +} + // onRequest is responsible for executing the closeCallbacks after the connection has been closed. func (c *connection) onRequest() (needTrigger bool) { var onRequest, ok = c.onRequestCallback.Load().(OnRequest) if !ok { return true } + // wait onConnect finished first + if atomic.LoadInt32(&c.state) == 0 && c.onConnectCallback.Load() != nil { + // let onConnect to call onRequest + return + } processed := c.onProcess( // only process when conn active and have unread data func(c *connection) bool { @@ -206,7 +265,7 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f } process(c) } - // Handling callback if connection has been closed. + // handling callback if connection has been closed. if closedBy != none { // if closed by user when processing, it "may" needs detach needDetach := closedBy == user @@ -219,7 +278,17 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f return } c.unlock(processing) - // Double check when exiting. + // Note: Poller's closeCallback call will try to get processing lock failed but here already neer to unlock processing. + // So here we need to check connection state again, to avoid connection leak + // double check close state + if c.status(closing) != 0 && c.lock(processing) { + // poller will get the processing lock failed, here help poller do closeCallback + // fd must already detach by poller + c.closeCallback(false, false) + panicked = false + return + } + // double check isProcessable if isProcessable(c) && c.lock(processing) { goto START } @@ -227,7 +296,6 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f panicked = false return } - runTask(c.ctx, task) return true } diff --git a/connection_reactor.go b/connection_reactor.go index cd5d717c..25b4dec5 100644 --- a/connection_reactor.go +++ b/connection_reactor.go @@ -30,6 +30,10 @@ func (c *connection) onHup(p Poll) error { } c.triggerRead(Exception(ErrEOF, "peer close")) c.triggerWrite(Exception(ErrConnClosed, "peer close")) + + // call Disconnect callback first + c.onDisconnect() + // It depends on closing by user if OnConnect and OnRequest is nil, otherwise it needs to be released actively. // It can be confirmed that the OnRequest goroutine has been exited before closeCallback executing, // and it is safe to close the buffer at this time. diff --git a/connection_test.go b/connection_test.go index 782e85c2..548d98a2 100644 --- a/connection_test.go +++ b/connection_test.go @@ -504,10 +504,10 @@ func TestParallelShortConnection(t *testing.T) { var received int64 el, err := NewEventLoop(func(ctx context.Context, connection Connection) error { data, err := connection.Reader().Next(connection.Reader().Len()) + atomic.AddInt64(&received, int64(len(data))) if err != nil { return err } - atomic.AddInt64(&received, int64(len(data))) //t.Logf("conn[%s] received: %d, active: %v", connection.RemoteAddr(), len(data), connection.IsActive()) return nil }) @@ -536,10 +536,13 @@ func TestParallelShortConnection(t *testing.T) { } wg.Wait() - for atomic.LoadInt64(&received) < int64(totalSize) { + count := 100 + for count > 0 && atomic.LoadInt64(&received) < int64(totalSize) { t.Logf("received: %d, except: %d", atomic.LoadInt64(&received), totalSize) time.Sleep(time.Millisecond * 100) + count-- } + Equal(t, atomic.LoadInt64(&received), int64(totalSize)) } func TestConnectionServerClose(t *testing.T) { diff --git a/eventloop.go b/eventloop.go index c9a903c0..425cd95f 100644 --- a/eventloop.go +++ b/eventloop.go @@ -34,6 +34,20 @@ type EventLoop interface { Shutdown(ctx context.Context) error } +/* The Connection Callback Sequence Diagram +| Connection State | Callback Function | Notes +| Connected but not initialized | OnPrepare | Conn is not registered into poller +| Connected and initialized | OnConnect | Conn is ready for read or write +| Read first byte | OnRequest | Conn is ready for read or write +| Peer closed but conn is active | OnDisconnect | Conn access will race with OnRequest function +| Self closed and conn is closed | CloseCallback | Conn is destroyed + +Execution Order: + OnPrepare => OnConnect => OnRequest => CloseCallback + OnDisconnect +Note: only OnRequest and OnDisconnect will be executed in parallel +*/ + // OnPrepare is used to inject custom preparation at connection initialization, // which is optional but important in some scenarios. For example, a qps limiter // can be set by closing overloaded connections directly in OnPrepare. @@ -63,6 +77,11 @@ type OnPrepare func(connection Connection) context.Context // } type OnConnect func(ctx context.Context, connection Connection) context.Context +// OnDisconnect is called once connection is going to be closed. +// OnDisconnect must return as quick as possible because it will block poller. +// OnDisconnect is different from CloseCallback, you could check with "The Connection Callback Sequence Diagram" section. +type OnDisconnect func(ctx context.Context, connection Connection) + // OnRequest defines the function for handling connection. When data is sent from the connection peer, // netpoll actively reads the data in LT mode and places it in the connection's input buffer. // Generally, OnRequest starts handling the data in the following way: diff --git a/netpoll_options.go b/netpoll_options.go index ec384f54..2cdb1c13 100644 --- a/netpoll_options.go +++ b/netpoll_options.go @@ -18,6 +18,7 @@ package netpoll import ( + "context" "io" "time" ) @@ -39,15 +40,26 @@ func SetNumLoops(numLoops int) error { // SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt // to distribute the incoming connections between multiple polls. -// This option only works when NumLoops is set. +// This option only works when numLoops is set. func SetLoadBalance(lb LoadBalance) error { return setLoadBalance(lb) } +// Initialize the pollers actively. By default, it's lazy initialized. +// It's safe to call it multi times. +func Initialize() { + initialize() +} + func SetLoggerOutput(w io.Writer) { setLoggerOutput(w) } +// SetRunner set the runner function for every OnRequest/OnConnect callback +func SetRunner(f func(ctx context.Context, f func())) { + setRunner(f) +} + // DisableGopool will remove gopool(the goroutine pool used to run OnRequest), // which means that OnRequest will be run via `go OnRequest(...)`. // Usually, OnRequest will cause stack expansion, which can be solved by reusing goroutine. @@ -71,6 +83,13 @@ func WithOnConnect(onConnect OnConnect) Option { }} } +// WithOnDisconnect registers the OnDisconnect method to EventLoop. +func WithOnDisconnect(onDisconnect OnDisconnect) Option { + return Option{func(op *options) { + op.onDisconnect = onDisconnect + }} +} + // WithReadTimeout sets the read timeout of connections. func WithReadTimeout(timeout time.Duration) Option { return Option{func(op *options) { @@ -100,6 +119,7 @@ type Option struct { type options struct { onPrepare OnPrepare onConnect OnConnect + onDisconnect OnDisconnect onRequest OnRequest readTimeout time.Duration writeTimeout time.Duration diff --git a/netpoll_test.go b/netpoll_test.go index 0467e879..fb985604 100644 --- a/netpoll_test.go +++ b/netpoll_test.go @@ -136,6 +136,116 @@ func TestOnConnectWrite(t *testing.T) { MustNil(t, err) } +func TestOnDisconnect(t *testing.T) { + type ctxKey struct{} + var network, address = "tcp", ":8888" + var canceled, closed int32 + var conns int32 = 100 + req := "ping" + var loop = newTestEventLoop(network, address, + func(ctx context.Context, connection Connection) error { + cancelFunc, _ := ctx.Value(ctxKey{}).(context.CancelFunc) + MustTrue(t, cancelFunc != nil) + Assert(t, ctx.Done() != nil) + + buf, err := connection.Reader().Next(4) // should consumed all data + MustNil(t, err) + Equal(t, string(buf), req) + select { + case <-ctx.Done(): + atomic.AddInt32(&canceled, 1) + case <-time.After(time.Second): + } + return nil + }, + WithOnConnect(func(ctx context.Context, conn Connection) context.Context { + conn.AddCloseCallback(func(connection Connection) error { + atomic.AddInt32(&closed, 1) + return nil + }) + ctx, cancel := context.WithCancel(ctx) + return context.WithValue(ctx, ctxKey{}, cancel) + }), + WithOnDisconnect(func(ctx context.Context, conn Connection) { + cancelFunc, _ := ctx.Value(ctxKey{}).(context.CancelFunc) + MustTrue(t, cancelFunc != nil) + cancelFunc() + }), + ) + + for i := int32(0); i < conns; i++ { + var conn, err = DialConnection(network, address, time.Second) + MustNil(t, err) + + _, err = conn.Writer().WriteString(req) + MustNil(t, err) + err = conn.Writer().Flush() + MustNil(t, err) + + err = conn.Close() + MustNil(t, err) + } + for atomic.LoadInt32(&closed) < conns { + t.Logf("closed: %d, canceled: %d", atomic.LoadInt32(&closed), atomic.LoadInt32(&canceled)) + runtime.Gosched() + } + Equal(t, atomic.LoadInt32(&closed), conns) + Equal(t, atomic.LoadInt32(&canceled), conns) + + err := loop.Shutdown(context.Background()) + MustNil(t, err) +} + +func TestOnDisconnectWhenOnConnect(t *testing.T) { + type ctxPrepareKey struct{} + type ctxConnectKey struct{} + var network, address = "tcp", ":8888" + var conns int32 = 100 + var wg sync.WaitGroup + wg.Add(int(conns) * 3) + var loop = newTestEventLoop(network, address, + func(ctx context.Context, connection Connection) error { + _, _ = connection.Reader().Next(connection.Reader().Len()) + return nil + }, + WithOnPrepare(func(connection Connection) context.Context { + defer wg.Done() + var counter int32 + return context.WithValue(context.Background(), ctxPrepareKey{}, &counter) + }), + WithOnConnect(func(ctx context.Context, conn Connection) context.Context { + defer wg.Done() + t.Logf("OnConnect: %v", conn.RemoteAddr()) + time.Sleep(time.Millisecond * 10) // wait for closed called + counter := ctx.Value(ctxPrepareKey{}).(*int32) + ok := atomic.CompareAndSwapInt32(counter, 0, 1) + Assert(t, ok) + return context.WithValue(ctx, ctxConnectKey{}, "123") + }), + WithOnDisconnect(func(ctx context.Context, conn Connection) { + defer wg.Done() + t.Logf("OnDisconnect: %v", conn.RemoteAddr()) + counter, _ := ctx.Value(ctxPrepareKey{}).(*int32) + ok := atomic.CompareAndSwapInt32(counter, 1, 2) + Assert(t, ok) + v := ctx.Value(ctxConnectKey{}).(string) + Equal(t, v, "123") + }), + ) + + for i := int32(0); i < conns; i++ { + var conn, err = DialConnection(network, address, time.Second) + MustNil(t, err) + err = conn.Close() + t.Logf("Close: %v", conn.LocalAddr()) + MustNil(t, err) + } + + wg.Wait() + err := loop.Shutdown(context.Background()) + MustNil(t, err) +} + func TestGracefulExit(t *testing.T) { var network, address = "tcp", ":8888" diff --git a/poll_manager.go b/poll_manager.go index 119187c0..4183ac3d 100644 --- a/poll_manager.go +++ b/poll_manager.go @@ -23,6 +23,7 @@ import ( "log" "os" "runtime" + "sync/atomic" ) func setNumLoops(numLoops int) error { @@ -33,57 +34,55 @@ func setLoadBalance(lb LoadBalance) error { return pollmanager.SetLoadBalance(lb) } +func initialize() { + // The first call of Pick() will init pollers + _ = pollmanager.Pick() +} + func setLoggerOutput(w io.Writer) { logger = log.New(w, "", log.LstdFlags) } -// manage all pollers +// pollmanager manage all pollers var pollmanager *manager var logger *log.Logger func init() { - var loops = runtime.GOMAXPROCS(0)/20 + 1 - pollmanager = &manager{} - pollmanager.SetLoadBalance(RoundRobin) - pollmanager.SetNumLoops(loops) - + pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1) setLoggerOutput(os.Stderr) } +const ( + managerUninitialized = iota + managerInitializing + managerInitialized +) + +func newManager(numLoops int) *manager { + m := new(manager) + m.SetLoadBalance(RoundRobin) + m.SetNumLoops(numLoops) + return m +} + // LoadBalance is used to do load balancing among multiple pollers. // a single poller may not be optimal if the number of cores is large (40C+). type manager struct { - NumLoops int + numLoops int32 + status int32 // 0: uninitialized, 1: initializing, 2: initialized balance loadbalance // load balancing method polls []Poll // all the polls } // SetNumLoops will return error when set numLoops < 1 -func (m *manager) SetNumLoops(numLoops int) error { +func (m *manager) SetNumLoops(numLoops int) (err error) { if numLoops < 1 { return fmt.Errorf("set invalid numLoops[%d]", numLoops) } - - if numLoops < m.NumLoops { - // if less than, close the redundant pollers - var polls = make([]Poll, numLoops) - for idx := 0; idx < m.NumLoops; idx++ { - if idx < numLoops { - polls[idx] = m.polls[idx] - } else { - if err := m.polls[idx].Close(); err != nil { - logger.Printf("NETPOLL: poller close failed: %v\n", err) - } - } - } - m.NumLoops = numLoops - m.polls = polls - m.balance.Rebalance(m.polls) - return nil - } - - m.NumLoops = numLoops - return m.Run() + // note: set new numLoops first and then change the status + atomic.StoreInt32(&m.numLoops, int32(numLoops)) + atomic.StoreInt32(&m.status, managerUninitialized) + return nil } // SetLoadBalance set load balance. @@ -96,14 +95,14 @@ func (m *manager) SetLoadBalance(lb LoadBalance) error { } // Close release all resources. -func (m *manager) Close() error { +func (m *manager) Close() (err error) { for _, poll := range m.polls { - poll.Close() + err = poll.Close() } - m.NumLoops = 0 + m.numLoops = 0 m.balance = nil m.polls = nil - return nil + return err } // Run all pollers. @@ -114,16 +113,34 @@ func (m *manager) Run() (err error) { } }() - // new poll to fill delta. - for idx := len(m.polls); idx < m.NumLoops; idx++ { - var poll Poll - poll, err = openPoll() - if err != nil { - return + numLoops := int(atomic.LoadInt32(&m.numLoops)) + if numLoops == len(m.polls) { + return nil + } + var polls = make([]Poll, numLoops) + if numLoops < len(m.polls) { + // shrink polls + copy(polls, m.polls[:numLoops]) + for idx := numLoops; idx < len(m.polls); idx++ { + // close redundant polls + if err = m.polls[idx].Close(); err != nil { + logger.Printf("NETPOLL: poller close failed: %v\n", err) + } + } + } else { + // growth polls + copy(polls, m.polls) + for idx := len(m.polls); idx < numLoops; idx++ { + var poll Poll + poll, err = openPoll() + if err != nil { + return err + } + polls[idx] = poll + go poll.Wait() } - m.polls = append(m.polls, poll) - go poll.Wait() } + m.polls = polls // LoadBalance must be set before calling Run, otherwise it will panic. m.balance.Rebalance(m.polls) @@ -141,5 +158,24 @@ func (m *manager) Reset() error { // Pick will select the poller for use each time based on the LoadBalance. func (m *manager) Pick() Poll { +START: + // fast path + if atomic.LoadInt32(&m.status) == managerInitialized { + return m.balance.Pick() + } + // slow path + // try to get initializing lock failed, wait others finished the init work, and try again + if !atomic.CompareAndSwapInt32(&m.status, managerUninitialized, managerInitializing) { + runtime.Gosched() + goto START + } + // adjust polls + // m.Run() will finish very quickly, so will not many goroutines block on Pick. + _ = m.Run() + + if !atomic.CompareAndSwapInt32(&m.status, managerInitializing, managerInitialized) { + // SetNumLoops called during m.Run() which cause CAS failed + // The polls will be adjusted next Pick + } return m.balance.Pick() } diff --git a/poll_manager_test.go b/poll_manager_test.go index f79f3003..63559051 100644 --- a/poll_manager_test.go +++ b/poll_manager_test.go @@ -18,6 +18,8 @@ package netpoll import ( + "runtime" + "sync" "testing" "time" ) @@ -45,9 +47,47 @@ func TestPollManager(t *testing.T) { } func TestPollManagerReset(t *testing.T) { - n := pollmanager.NumLoops + n := pollmanager.numLoops err := pollmanager.Reset() MustNil(t, err) - Equal(t, len(pollmanager.polls), n) - Equal(t, pollmanager.NumLoops, n) + Equal(t, len(pollmanager.polls), int(n)) +} + +func TestPollManagerSetNumLoops(t *testing.T) { + pm := newManager(1) + + startGs := runtime.NumGoroutine() + poll := pm.Pick() + newGs := runtime.NumGoroutine() + Assert(t, poll != nil) + Assert(t, newGs-startGs == 1, newGs, startGs) + t.Logf("old=%d, new=%d", startGs, newGs) + + // change pollers + oldGs := newGs + err := pm.SetNumLoops(100) + MustNil(t, err) + newGs = runtime.NumGoroutine() + t.Logf("old=%d, new=%d", oldGs, newGs) + Assert(t, newGs == oldGs) + + // trigger polls adjustment + var wg sync.WaitGroup + finish := make(chan struct{}) + oldGs = startGs + 32 // 32 self goroutines + for i := 0; i < 32; i++ { + wg.Add(1) + go func() { + poll := pm.Pick() + newGs := runtime.NumGoroutine() + t.Logf("old=%d, new=%d", oldGs, newGs) + Assert(t, poll != nil) + Assert(t, newGs-oldGs == 100) + Assert(t, len(pm.polls) == 100) + wg.Done() + <-finish // hold goroutines + }() + } + wg.Wait() + close(finish) } diff --git a/sys_exec.go b/sys_exec.go index 1c8e40e4..8a7c5784 100644 --- a/sys_exec.go +++ b/sys_exec.go @@ -94,8 +94,10 @@ func readv(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) { } // TODO: read from sysconf(_SC_IOV_MAX)? The Linux default is -// 1024 and this seems conservative enough for now. Darwin's -// UIO_MAXIOV also seems to be 1024. +// +// 1024 and this seems conservative enough for now. Darwin's +// UIO_MAXIOV also seems to be 1024. +// // iovecs limit length to 2GB(2^31) func iovecs(bs [][]byte, ivs []syscall.Iovec) (iovLen int) { totalLen := 0 diff --git a/sys_exec_test.go b/sys_exec_test.go index 54398df1..f35bb6e0 100644 --- a/sys_exec_test.go +++ b/sys_exec_test.go @@ -105,18 +105,23 @@ func TestReadv(t *testing.T) { w3, _ := syscall.Write(w, vs[2]) Equal(t, w1+w2+w3, 31) - var barrier = barrier{} - barrier.bs = [][]byte{ + var barrier = barrier{ + bs: make([][]byte, 4), + } + res := [][]byte{ make([]byte, 0), make([]byte, 10), make([]byte, 11), make([]byte, 10), } + for i := range res { + barrier.bs[i] = res[i] + } barrier.ivs = make([]syscall.Iovec, len(barrier.bs)) rn, err := readv(r, barrier.bs, barrier.ivs) MustNil(t, err) Equal(t, rn, 31) - for i, v := range barrier.bs { + for i, v := range res { t.Logf("READ [%d] %s", i, v) } }