From 833a795460d2772d0aeba56ac9dd984adeb0e6c6 Mon Sep 17 00:00:00 2001 From: Joway Date: Wed, 18 Oct 2023 15:09:28 +0800 Subject: [PATCH 01/11] fix: panic/fault when dial connection timeout (#289) --- connection_test.go | 37 +++++++++++++++++++++++++++++++++++++ net_polldesc.go | 9 ++++++--- 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/connection_test.go b/connection_test.go index 6de6f017..782e85c2 100644 --- a/connection_test.go +++ b/connection_test.go @@ -25,6 +25,7 @@ import ( "net" "os" "runtime" + "strings" "sync" "sync/atomic" "syscall" @@ -638,3 +639,39 @@ func TestConnectionServerClose(t *testing.T) { //time.Sleep(time.Second) wg.Wait() } + +func TestConnectionDailTimeoutAndClose(t *testing.T) { + ln, err := createTestListener("tcp", ":12345") + MustNil(t, err) + defer ln.Close() + + el, err := NewEventLoop( + func(ctx context.Context, connection Connection) error { + _, err = connection.Reader().Next(connection.Reader().Len()) + return err + }, + ) + defer el.Shutdown(context.Background()) + go func() { + err := el.Serve(ln) + if err != nil { + t.Logf("servce end with error: %v", err) + } + }() + + loops := 100 + conns := 100 + for l := 0; l < loops; l++ { + var wg sync.WaitGroup + wg.Add(conns) + for i := 0; i < conns; i++ { + go func() { + defer wg.Done() + conn, err := DialConnection("tcp", ":12345", time.Nanosecond) + Assert(t, err == nil || strings.Contains(err.Error(), "i/o timeout")) + _ = conn + }() + } + wg.Wait() + } +} diff --git a/net_polldesc.go b/net_polldesc.go index dfd95de1..c197850e 100644 --- a/net_polldesc.go +++ b/net_polldesc.go @@ -53,14 +53,17 @@ func (pd *pollDesc) WaitWrite(ctx context.Context) (err error) { } select { - case <-pd.closeTrigger: + case <-pd.closeTrigger: // triggered by poller // no need to detach, since poller has done it in OnHup. return Exception(ErrConnClosed, "by peer") - case <-pd.writeTrigger: + case <-pd.writeTrigger: // triggered by poller err = nil - case <-ctx.Done(): + case <-ctx.Done(): // triggered by ctx // deregister from poller, upper caller function will close fd + // detach first but there's a very small possibility that operator is doing in poller, + // so need call unused() to wait operator done pd.detach() + pd.operator.unused() err = mapErr(ctx.Err()) } // double check close trigger From 4c413907033ccc1e503296ef12b500e6b7a7f12c Mon Sep 17 00:00:00 2001 From: Joway Date: Wed, 18 Oct 2023 15:46:55 +0800 Subject: [PATCH 02/11] optimise: rm useless inputBarrier (#290) --- connection_impl.go | 3 +-- connection_reactor.go | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/connection_impl.go b/connection_impl.go index 1fa1a8e4..eb99f1c1 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -43,7 +43,6 @@ type connection struct { writeTrigger chan error inputBuffer *LinkBuffer outputBuffer *LinkBuffer - inputBarrier *barrier outputBarrier *barrier supportZeroCopy bool maxSize int // The maximum size of data between two Release(). @@ -323,7 +322,7 @@ func (c *connection) init(conn Conn, opts *options) (err error) { c.writeTrigger = make(chan error, 1) c.bookSize, c.maxSize = pagesize, pagesize c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer() - c.inputBarrier, c.outputBarrier = barrierPool.Get().(*barrier), barrierPool.Get().(*barrier) + c.outputBarrier = barrierPool.Get().(*barrier) c.initNetFD(conn) // conn must be *netFD{} c.initFDOperator() diff --git a/connection_reactor.go b/connection_reactor.go index fa485be1..cd5d717c 100644 --- a/connection_reactor.go +++ b/connection_reactor.go @@ -71,7 +71,6 @@ func (c *connection) closeBuffer() { // so we need to check the buffer length, and if it's an "unclean" close operation, let's give up to reuse the buffer if c.inputBuffer.Len() == 0 || onConnect != nil || onRequest != nil { c.inputBuffer.Close() - barrierPool.Put(c.inputBarrier) } if c.outputBuffer.Len() == 0 || onConnect != nil || onRequest != nil { c.outputBuffer.Close() From 27be26a127a0abf4bd6d65e47a8f6125ae478795 Mon Sep 17 00:00:00 2001 From: Joway Date: Thu, 26 Oct 2023 12:18:24 +0800 Subject: [PATCH 03/11] feat: add SetRunner option (#294) --- connection_onevent.go | 4 ++++ netpoll_options.go | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/connection_onevent.go b/connection_onevent.go index 9b87f01b..6f055f37 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -26,6 +26,10 @@ import ( var runTask = gopool.CtxGo +func setRunner(runner func(ctx context.Context, f func())) { + runTask = runner +} + func disableGopool() error { runTask = func(ctx context.Context, f func()) { go f() diff --git a/netpoll_options.go b/netpoll_options.go index ec384f54..023e574c 100644 --- a/netpoll_options.go +++ b/netpoll_options.go @@ -18,6 +18,7 @@ package netpoll import ( + "context" "io" "time" ) @@ -48,6 +49,11 @@ func SetLoggerOutput(w io.Writer) { setLoggerOutput(w) } +// SetRunner set the runner function for every OnRequest/OnConnect callback +func SetRunner(f func(ctx context.Context, f func())) { + setRunner(f) +} + // DisableGopool will remove gopool(the goroutine pool used to run OnRequest), // which means that OnRequest will be run via `go OnRequest(...)`. // Usually, OnRequest will cause stack expansion, which can be solved by reusing goroutine. From 97071780ae193f8156bd30c81736d91c8e701ddb Mon Sep 17 00:00:00 2001 From: Joway Date: Wed, 29 Nov 2023 17:32:44 +0800 Subject: [PATCH 04/11] fix: stop timer when read triggered by err (#296) --- connection_impl.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connection_impl.go b/connection_impl.go index eb99f1c1..77212de6 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -447,7 +447,7 @@ func (c *connection) waitReadWithTimeout(n int) (err error) { return Exception(ErrReadTimeout, c.remoteAddr.String()) case err = <-c.readTrigger: if err != nil { - return err + goto RET } continue } From 654cef17e9abae548d176dedad204bc67780fbc0 Mon Sep 17 00:00:00 2001 From: Joway Date: Thu, 4 Jan 2024 13:40:26 +0800 Subject: [PATCH 05/11] feat: netpoll exception implement net.Error interface (#300) --- connection_errors.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/connection_errors.go b/connection_errors.go index b08ba668..1edfa21d 100644 --- a/connection_errors.go +++ b/connection_errors.go @@ -16,6 +16,7 @@ package netpoll import ( "fmt" + "net" "syscall" ) @@ -51,6 +52,10 @@ func Exception(err error, suffix string) error { return &exception{no: no, suffix: suffix} } +var ( + _ net.Error = (*exception)(nil) +) + type exception struct { no syscall.Errno suffix string @@ -88,6 +93,21 @@ func (e *exception) Unwrap() error { return e.no } +func (e *exception) Timeout() bool { + switch e.no { + case ErrDialTimeout, ErrReadTimeout, ErrWriteTimeout: + return true + } + if e.no.Timeout() { + return true + } + return false +} + +func (e *exception) Temporary() bool { + return e.no.Temporary() +} + // Errors defined in netpoll var errnos = [...]string{ ErrnoMask & ErrConnClosed: "connection has been closed", From a6a5330e30c5a3274a6ac0d3c26be57be12ca583 Mon Sep 17 00:00:00 2001 From: Binary_Oracle <86190302+BinaryOracle@users.noreply.github.com> Date: Tue, 23 Jan 2024 15:16:03 +0800 Subject: [PATCH 06/11] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8Dsys=5Fexec=5Ftes?= =?UTF-8?q?t.go=E5=87=BD=E6=95=B0=E4=B8=ADTestReadv=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E5=87=BD=E6=95=B0=E4=BD=BF=E7=94=A8=E9=94=99=E8=AF=AF=20(#297)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sys_exec.go | 6 ++++-- sys_exec_test.go | 11 ++++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/sys_exec.go b/sys_exec.go index 1c8e40e4..8a7c5784 100644 --- a/sys_exec.go +++ b/sys_exec.go @@ -94,8 +94,10 @@ func readv(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) { } // TODO: read from sysconf(_SC_IOV_MAX)? The Linux default is -// 1024 and this seems conservative enough for now. Darwin's -// UIO_MAXIOV also seems to be 1024. +// +// 1024 and this seems conservative enough for now. Darwin's +// UIO_MAXIOV also seems to be 1024. +// // iovecs limit length to 2GB(2^31) func iovecs(bs [][]byte, ivs []syscall.Iovec) (iovLen int) { totalLen := 0 diff --git a/sys_exec_test.go b/sys_exec_test.go index 54398df1..f35bb6e0 100644 --- a/sys_exec_test.go +++ b/sys_exec_test.go @@ -105,18 +105,23 @@ func TestReadv(t *testing.T) { w3, _ := syscall.Write(w, vs[2]) Equal(t, w1+w2+w3, 31) - var barrier = barrier{} - barrier.bs = [][]byte{ + var barrier = barrier{ + bs: make([][]byte, 4), + } + res := [][]byte{ make([]byte, 0), make([]byte, 10), make([]byte, 11), make([]byte, 10), } + for i := range res { + barrier.bs[i] = res[i] + } barrier.ivs = make([]syscall.Iovec, len(barrier.bs)) rn, err := readv(r, barrier.bs, barrier.ivs) MustNil(t, err) Equal(t, rn, 31) - for i, v := range barrier.bs { + for i, v := range res { t.Logf("READ [%d] %s", i, v) } } From 329188df82daf5a5d111048281c8a1f21109e0e9 Mon Sep 17 00:00:00 2001 From: ning2510 <48829770+ning2510@users.noreply.github.com> Date: Tue, 23 Jan 2024 15:49:11 +0800 Subject: [PATCH 07/11] ci: bump the version of actions/checkout and actions/setup-go (#302) --- .github/workflows/codeql-analysis.yml | 4 ++-- .github/workflows/pr-check.yml | 12 ++++++------ .github/workflows/release-check.yml | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index bff66b1b..c9e4e947 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -39,10 +39,10 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v2 + uses: actions/setup-go@v5 with: go-version: "1.20" diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml index a923b418..ba4c9262 100644 --- a/.github/workflows/pr-check.yml +++ b/.github/workflows/pr-check.yml @@ -10,9 +10,9 @@ jobs: os: [ X64, ARM64 ] runs-on: ${{ matrix.os }} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 with: go-version: ${{ matrix.go }} # - uses: actions/cache@v2 @@ -28,9 +28,9 @@ jobs: windows-test: runs-on: windows-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 with: go-version: "1.20" # - uses: actions/cache@v2 @@ -44,9 +44,9 @@ jobs: style-test: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 with: go-version: 1.16 - name: Check License Header diff --git a/.github/workflows/release-check.yml b/.github/workflows/release-check.yml index 7c8f5fea..0e75ca1e 100644 --- a/.github/workflows/release-check.yml +++ b/.github/workflows/release-check.yml @@ -9,7 +9,7 @@ jobs: build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Check Source Branch run: python2 -c "exit(0 if '${{ github.head_ref }}'.startswith('release') or '${{ github.head_ref }}'.startswith('hotfix') else 1)" From c3792e87d4a69a50031b42e33c0b1e44974f2080 Mon Sep 17 00:00:00 2001 From: Joway Date: Fri, 2 Feb 2024 20:01:02 +0800 Subject: [PATCH 08/11] feat: add WithOnDisconnect callback (#303) --- connection.go | 2 +- connection_onevent.go | 26 ++++++++++++++++--- connection_reactor.go | 4 +++ eventloop.go | 14 ++++++++++ netpoll_options.go | 8 ++++++ netpoll_test.go | 60 +++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 109 insertions(+), 5 deletions(-) diff --git a/connection.go b/connection.go index 515cbc60..d02f683a 100644 --- a/connection.go +++ b/connection.go @@ -19,7 +19,7 @@ import ( "time" ) -// CloseCallback will be called when the connection is closed. +// CloseCallback will be called after the connection is closed. // Return: error is unused which will be ignored directly. type CloseCallback func(connection Connection) error diff --git a/connection_onevent.go b/connection_onevent.go index 6f055f37..10bead70 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -48,10 +48,11 @@ type gracefulExit interface { // OnPrepare, OnRequest, CloseCallback share the lock processing, // which is a CAS lock and can only be cleared by OnRequest. type onEvent struct { - ctx context.Context - onConnectCallback atomic.Value - onRequestCallback atomic.Value - closeCallbacks atomic.Value // value is latest *callbackNode + ctx context.Context + onConnectCallback atomic.Value + onDisconnectCallback atomic.Value + onRequestCallback atomic.Value + closeCallbacks atomic.Value // value is latest *callbackNode } type callbackNode struct { @@ -67,6 +68,14 @@ func (c *connection) SetOnConnect(onConnect OnConnect) error { return nil } +// SetOnDisconnect set the OnDisconnect callback. +func (c *connection) SetOnDisconnect(onDisconnect OnDisconnect) error { + if onDisconnect != nil { + c.onDisconnectCallback.Store(onDisconnect) + } + return nil +} + // SetOnRequest initialize ctx when setting OnRequest. func (c *connection) SetOnRequest(onRequest OnRequest) error { if onRequest == nil { @@ -99,6 +108,7 @@ func (c *connection) AddCloseCallback(callback CloseCallback) error { func (c *connection) onPrepare(opts *options) (err error) { if opts != nil { c.SetOnConnect(opts.onConnect) + c.SetOnDisconnect(opts.onDisconnect) c.SetOnRequest(opts.onRequest) c.SetReadTimeout(opts.readTimeout) c.SetWriteTimeout(opts.writeTimeout) @@ -150,6 +160,14 @@ func (c *connection) onConnect() { ) } +func (c *connection) onDisconnect() { + var onDisconnect, _ = c.onDisconnectCallback.Load().(OnDisconnect) + if onDisconnect == nil { + return + } + onDisconnect(c.ctx, c) +} + // onRequest is responsible for executing the closeCallbacks after the connection has been closed. func (c *connection) onRequest() (needTrigger bool) { var onRequest, ok = c.onRequestCallback.Load().(OnRequest) diff --git a/connection_reactor.go b/connection_reactor.go index cd5d717c..25b4dec5 100644 --- a/connection_reactor.go +++ b/connection_reactor.go @@ -30,6 +30,10 @@ func (c *connection) onHup(p Poll) error { } c.triggerRead(Exception(ErrEOF, "peer close")) c.triggerWrite(Exception(ErrConnClosed, "peer close")) + + // call Disconnect callback first + c.onDisconnect() + // It depends on closing by user if OnConnect and OnRequest is nil, otherwise it needs to be released actively. // It can be confirmed that the OnRequest goroutine has been exited before closeCallback executing, // and it is safe to close the buffer at this time. diff --git a/eventloop.go b/eventloop.go index c9a903c0..70d4d0be 100644 --- a/eventloop.go +++ b/eventloop.go @@ -34,6 +34,15 @@ type EventLoop interface { Shutdown(ctx context.Context) error } +/* The Connection Callback Sequence Diagram +| Connection State | Callback Function | Notes +| Connected but not initialized | OnPrepare | Conn is not registered into poller +| Connected and initialized | OnConnect | Conn is ready for read or write +| Read first byte | OnRequest | Conn is ready for read or write +| Peer closed but conn is active | OnDisconnect | Conn access will race with OnRequest function +| Self closed and conn is closed | CloseCallback | Conn is destroyed +*/ + // OnPrepare is used to inject custom preparation at connection initialization, // which is optional but important in some scenarios. For example, a qps limiter // can be set by closing overloaded connections directly in OnPrepare. @@ -63,6 +72,11 @@ type OnPrepare func(connection Connection) context.Context // } type OnConnect func(ctx context.Context, connection Connection) context.Context +// OnDisconnect is called once connection is going to be closed. +// OnDisconnect must return as quick as possible because it will block poller. +// OnDisconnect is different from CloseCallback, you could check with "The Connection Callback Sequence Diagram" section. +type OnDisconnect func(ctx context.Context, connection Connection) + // OnRequest defines the function for handling connection. When data is sent from the connection peer, // netpoll actively reads the data in LT mode and places it in the connection's input buffer. // Generally, OnRequest starts handling the data in the following way: diff --git a/netpoll_options.go b/netpoll_options.go index 023e574c..db752fe4 100644 --- a/netpoll_options.go +++ b/netpoll_options.go @@ -77,6 +77,13 @@ func WithOnConnect(onConnect OnConnect) Option { }} } +// WithOnDisconnect registers the OnDisconnect method to EventLoop. +func WithOnDisconnect(onDisconnect OnDisconnect) Option { + return Option{func(op *options) { + op.onDisconnect = onDisconnect + }} +} + // WithReadTimeout sets the read timeout of connections. func WithReadTimeout(timeout time.Duration) Option { return Option{func(op *options) { @@ -106,6 +113,7 @@ type Option struct { type options struct { onPrepare OnPrepare onConnect OnConnect + onDisconnect OnDisconnect onRequest OnRequest readTimeout time.Duration writeTimeout time.Duration diff --git a/netpoll_test.go b/netpoll_test.go index 0467e879..219ec761 100644 --- a/netpoll_test.go +++ b/netpoll_test.go @@ -136,6 +136,66 @@ func TestOnConnectWrite(t *testing.T) { MustNil(t, err) } +func TestOnDisconnect(t *testing.T) { + var ctxKey = struct{}{} + var network, address = "tcp", ":8888" + var canceled, closed int32 + var conns int32 = 100 + req := "ping" + var loop = newTestEventLoop(network, address, + func(ctx context.Context, connection Connection) error { + cancelFunc, _ := ctx.Value(ctxKey).(context.CancelFunc) + MustTrue(t, cancelFunc != nil) + Assert(t, ctx.Done() != nil) + + buf, err := connection.Reader().Next(4) // should consumed all data + MustNil(t, err) + Equal(t, string(buf), req) + select { + case <-ctx.Done(): + atomic.AddInt32(&canceled, 1) + case <-time.After(time.Second): + } + return nil + }, + WithOnConnect(func(ctx context.Context, conn Connection) context.Context { + conn.AddCloseCallback(func(connection Connection) error { + atomic.AddInt32(&closed, 1) + return nil + }) + ctx, cancel := context.WithCancel(ctx) + return context.WithValue(ctx, ctxKey, cancel) + }), + WithOnDisconnect(func(ctx context.Context, conn Connection) { + cancelFunc, _ := ctx.Value(ctxKey).(context.CancelFunc) + MustTrue(t, cancelFunc != nil) + cancelFunc() + }), + ) + + for i := int32(0); i < conns; i++ { + var conn, err = DialConnection(network, address, time.Second) + MustNil(t, err) + + _, err = conn.Writer().WriteString(req) + MustNil(t, err) + err = conn.Writer().Flush() + MustNil(t, err) + + err = conn.Close() + MustNil(t, err) + } + for atomic.LoadInt32(&closed) < conns { + t.Logf("closed: %d, canceled: %d", atomic.LoadInt32(&closed), atomic.LoadInt32(&canceled)) + runtime.Gosched() + } + Equal(t, atomic.LoadInt32(&closed), conns) + Equal(t, atomic.LoadInt32(&canceled), conns) + + err := loop.Shutdown(context.Background()) + MustNil(t, err) +} + func TestGracefulExit(t *testing.T) { var network, address = "tcp", ":8888" From faa52638971c84869e1b6783775a2d9efc463358 Mon Sep 17 00:00:00 2001 From: Joway Date: Tue, 6 Feb 2024 15:15:12 +0800 Subject: [PATCH 09/11] fix: connection leak when poller close connection but onRequest callback just finished (#304) --- connection_onevent.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/connection_onevent.go b/connection_onevent.go index 10bead70..51274e08 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -228,7 +228,7 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f } process(c) } - // Handling callback if connection has been closed. + // handling callback if connection has been closed. if closedBy != none { // if closed by user when processing, it "may" needs detach needDetach := closedBy == user @@ -241,7 +241,17 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f return } c.unlock(processing) - // Double check when exiting. + // Note: Poller's closeCallback call will try to get processing lock failed but here already neer to unlock processing. + // So here we need to check connection state again, to avoid connection leak + // double check close state + if c.status(closing) != 0 && c.lock(processing) { + // poller will get the processing lock failed, here help poller do closeCallback + // fd must already detach by poller + c.closeCallback(false, false) + panicked = false + return + } + // double check isProcessable if isProcessable(c) && c.lock(processing) { goto START } From b19383452abd17fc22f7185b02fc9427a25fddd1 Mon Sep 17 00:00:00 2001 From: Joway Date: Tue, 20 Feb 2024 13:35:52 +0800 Subject: [PATCH 10/11] fix: ctx race when disconnect callback run with connect callback (#307) --- connection_impl.go | 6 +++-- connection_lock.go | 1 + connection_onevent.go | 46 ++++++++++++++++++++++++++++++---- connection_test.go | 7 ++++-- eventloop.go | 5 ++++ netpoll_test.go | 58 ++++++++++++++++++++++++++++++++++++++++--- 6 files changed, 110 insertions(+), 13 deletions(-) diff --git a/connection_impl.go b/connection_impl.go index 77212de6..b683b4df 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -45,8 +45,9 @@ type connection struct { outputBuffer *LinkBuffer outputBarrier *barrier supportZeroCopy bool - maxSize int // The maximum size of data between two Release(). - bookSize int // The size of data that can be read at once. + maxSize int // The maximum size of data between two Release(). + bookSize int // The size of data that can be read at once. + state int32 // 0: not connected, 1: connected, 2: disconnected. Connection state should be changed sequentially. } var ( @@ -323,6 +324,7 @@ func (c *connection) init(conn Conn, opts *options) (err error) { c.bookSize, c.maxSize = pagesize, pagesize c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer() c.outputBarrier = barrierPool.Get().(*barrier) + c.state = 0 c.initNetFD(conn) // conn must be *netFD{} c.initFDOperator() diff --git a/connection_lock.go b/connection_lock.go index 4b0f7360..b6e6fa3c 100644 --- a/connection_lock.go +++ b/connection_lock.go @@ -45,6 +45,7 @@ type key int32 const ( closing key = iota + connecting processing flushing // total must be at the bottom. diff --git a/connection_onevent.go b/connection_onevent.go index 51274e08..35b7c001 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -134,23 +134,36 @@ func (c *connection) onPrepare(opts *options) (err error) { func (c *connection) onConnect() { var onConnect, _ = c.onConnectCallback.Load().(OnConnect) if onConnect == nil { + atomic.StoreInt32(&c.state, 1) + return + } + if !c.lock(connecting) { + // it never happens because onDisconnect will not lock connecting if c.connected == 0 return } var onRequest, _ = c.onRequestCallback.Load().(OnRequest) - var connected int32 c.onProcess( // only process when conn active and have unread data func(c *connection) bool { // if onConnect not called - if atomic.LoadInt32(&connected) == 0 { + if atomic.LoadInt32(&c.state) == 0 { return true } // check for onRequest return onRequest != nil && c.Reader().Len() > 0 }, func(c *connection) { - if atomic.CompareAndSwapInt32(&connected, 0, 1) { + if atomic.CompareAndSwapInt32(&c.state, 0, 1) { c.ctx = onConnect(c.ctx, c) + + if !c.IsActive() && atomic.CompareAndSwapInt32(&c.state, 1, 2) { + // since we hold connecting lock, so we should help to call onDisconnect here + var onDisconnect, _ = c.onDisconnectCallback.Load().(OnDisconnect) + if onDisconnect != nil { + onDisconnect(c.ctx, c) + } + } + c.unlock(connecting) return } if onRequest != nil { @@ -160,12 +173,31 @@ func (c *connection) onConnect() { ) } +// when onDisconnect called, c.IsActive() must return false func (c *connection) onDisconnect() { var onDisconnect, _ = c.onDisconnectCallback.Load().(OnDisconnect) if onDisconnect == nil { return } - onDisconnect(c.ctx, c) + var onConnect, _ = c.onConnectCallback.Load().(OnConnect) + if onConnect == nil { + // no need lock if onConnect is nil + atomic.StoreInt32(&c.state, 2) + onDisconnect(c.ctx, c) + return + } + // check if OnConnect finished when onConnect != nil && onDisconnect != nil + if atomic.LoadInt32(&c.state) > 0 && c.lock(connecting) { // means OnConnect already finished + // protect onDisconnect run once + // if CAS return false, means OnConnect already helps to run onDisconnect + if atomic.CompareAndSwapInt32(&c.state, 1, 2) { + onDisconnect(c.ctx, c) + } + c.unlock(connecting) + return + } + // OnConnect is not finished yet, return and let onConnect helps to call onDisconnect + return } // onRequest is responsible for executing the closeCallbacks after the connection has been closed. @@ -174,6 +206,11 @@ func (c *connection) onRequest() (needTrigger bool) { if !ok { return true } + // wait onConnect finished first + if atomic.LoadInt32(&c.state) == 0 && c.onConnectCallback.Load() != nil { + // let onConnect to call onRequest + return + } processed := c.onProcess( // only process when conn active and have unread data func(c *connection) bool { @@ -259,7 +296,6 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f panicked = false return } - runTask(c.ctx, task) return true } diff --git a/connection_test.go b/connection_test.go index 782e85c2..548d98a2 100644 --- a/connection_test.go +++ b/connection_test.go @@ -504,10 +504,10 @@ func TestParallelShortConnection(t *testing.T) { var received int64 el, err := NewEventLoop(func(ctx context.Context, connection Connection) error { data, err := connection.Reader().Next(connection.Reader().Len()) + atomic.AddInt64(&received, int64(len(data))) if err != nil { return err } - atomic.AddInt64(&received, int64(len(data))) //t.Logf("conn[%s] received: %d, active: %v", connection.RemoteAddr(), len(data), connection.IsActive()) return nil }) @@ -536,10 +536,13 @@ func TestParallelShortConnection(t *testing.T) { } wg.Wait() - for atomic.LoadInt64(&received) < int64(totalSize) { + count := 100 + for count > 0 && atomic.LoadInt64(&received) < int64(totalSize) { t.Logf("received: %d, except: %d", atomic.LoadInt64(&received), totalSize) time.Sleep(time.Millisecond * 100) + count-- } + Equal(t, atomic.LoadInt64(&received), int64(totalSize)) } func TestConnectionServerClose(t *testing.T) { diff --git a/eventloop.go b/eventloop.go index 70d4d0be..425cd95f 100644 --- a/eventloop.go +++ b/eventloop.go @@ -41,6 +41,11 @@ type EventLoop interface { | Read first byte | OnRequest | Conn is ready for read or write | Peer closed but conn is active | OnDisconnect | Conn access will race with OnRequest function | Self closed and conn is closed | CloseCallback | Conn is destroyed + +Execution Order: + OnPrepare => OnConnect => OnRequest => CloseCallback + OnDisconnect +Note: only OnRequest and OnDisconnect will be executed in parallel */ // OnPrepare is used to inject custom preparation at connection initialization, diff --git a/netpoll_test.go b/netpoll_test.go index 219ec761..fb985604 100644 --- a/netpoll_test.go +++ b/netpoll_test.go @@ -137,14 +137,14 @@ func TestOnConnectWrite(t *testing.T) { } func TestOnDisconnect(t *testing.T) { - var ctxKey = struct{}{} + type ctxKey struct{} var network, address = "tcp", ":8888" var canceled, closed int32 var conns int32 = 100 req := "ping" var loop = newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { - cancelFunc, _ := ctx.Value(ctxKey).(context.CancelFunc) + cancelFunc, _ := ctx.Value(ctxKey{}).(context.CancelFunc) MustTrue(t, cancelFunc != nil) Assert(t, ctx.Done() != nil) @@ -164,10 +164,10 @@ func TestOnDisconnect(t *testing.T) { return nil }) ctx, cancel := context.WithCancel(ctx) - return context.WithValue(ctx, ctxKey, cancel) + return context.WithValue(ctx, ctxKey{}, cancel) }), WithOnDisconnect(func(ctx context.Context, conn Connection) { - cancelFunc, _ := ctx.Value(ctxKey).(context.CancelFunc) + cancelFunc, _ := ctx.Value(ctxKey{}).(context.CancelFunc) MustTrue(t, cancelFunc != nil) cancelFunc() }), @@ -196,6 +196,56 @@ func TestOnDisconnect(t *testing.T) { MustNil(t, err) } +func TestOnDisconnectWhenOnConnect(t *testing.T) { + type ctxPrepareKey struct{} + type ctxConnectKey struct{} + var network, address = "tcp", ":8888" + var conns int32 = 100 + var wg sync.WaitGroup + wg.Add(int(conns) * 3) + var loop = newTestEventLoop(network, address, + func(ctx context.Context, connection Connection) error { + _, _ = connection.Reader().Next(connection.Reader().Len()) + return nil + }, + WithOnPrepare(func(connection Connection) context.Context { + defer wg.Done() + var counter int32 + return context.WithValue(context.Background(), ctxPrepareKey{}, &counter) + }), + WithOnConnect(func(ctx context.Context, conn Connection) context.Context { + defer wg.Done() + t.Logf("OnConnect: %v", conn.RemoteAddr()) + time.Sleep(time.Millisecond * 10) // wait for closed called + counter := ctx.Value(ctxPrepareKey{}).(*int32) + ok := atomic.CompareAndSwapInt32(counter, 0, 1) + Assert(t, ok) + return context.WithValue(ctx, ctxConnectKey{}, "123") + }), + WithOnDisconnect(func(ctx context.Context, conn Connection) { + defer wg.Done() + t.Logf("OnDisconnect: %v", conn.RemoteAddr()) + counter, _ := ctx.Value(ctxPrepareKey{}).(*int32) + ok := atomic.CompareAndSwapInt32(counter, 1, 2) + Assert(t, ok) + v := ctx.Value(ctxConnectKey{}).(string) + Equal(t, v, "123") + }), + ) + + for i := int32(0); i < conns; i++ { + var conn, err = DialConnection(network, address, time.Second) + MustNil(t, err) + err = conn.Close() + t.Logf("Close: %v", conn.LocalAddr()) + MustNil(t, err) + } + + wg.Wait() + err := loop.Shutdown(context.Background()) + MustNil(t, err) +} + func TestGracefulExit(t *testing.T) { var network, address = "tcp", ":8888" From 7ba622bf763b69fcb3fa7ca43bcbaea9adecc6b2 Mon Sep 17 00:00:00 2001 From: Joway Date: Tue, 20 Feb 2024 17:04:56 +0800 Subject: [PATCH 11/11] feat: lazy init pollers to avoid create any poller goroutines if netpoll is not used (#306) --- netpoll_options.go | 8 ++- poll_manager.go | 118 ++++++++++++++++++++++++++++--------------- poll_manager_test.go | 46 +++++++++++++++-- 3 files changed, 127 insertions(+), 45 deletions(-) diff --git a/netpoll_options.go b/netpoll_options.go index db752fe4..2cdb1c13 100644 --- a/netpoll_options.go +++ b/netpoll_options.go @@ -40,11 +40,17 @@ func SetNumLoops(numLoops int) error { // SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt // to distribute the incoming connections between multiple polls. -// This option only works when NumLoops is set. +// This option only works when numLoops is set. func SetLoadBalance(lb LoadBalance) error { return setLoadBalance(lb) } +// Initialize the pollers actively. By default, it's lazy initialized. +// It's safe to call it multi times. +func Initialize() { + initialize() +} + func SetLoggerOutput(w io.Writer) { setLoggerOutput(w) } diff --git a/poll_manager.go b/poll_manager.go index 119187c0..4183ac3d 100644 --- a/poll_manager.go +++ b/poll_manager.go @@ -23,6 +23,7 @@ import ( "log" "os" "runtime" + "sync/atomic" ) func setNumLoops(numLoops int) error { @@ -33,57 +34,55 @@ func setLoadBalance(lb LoadBalance) error { return pollmanager.SetLoadBalance(lb) } +func initialize() { + // The first call of Pick() will init pollers + _ = pollmanager.Pick() +} + func setLoggerOutput(w io.Writer) { logger = log.New(w, "", log.LstdFlags) } -// manage all pollers +// pollmanager manage all pollers var pollmanager *manager var logger *log.Logger func init() { - var loops = runtime.GOMAXPROCS(0)/20 + 1 - pollmanager = &manager{} - pollmanager.SetLoadBalance(RoundRobin) - pollmanager.SetNumLoops(loops) - + pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1) setLoggerOutput(os.Stderr) } +const ( + managerUninitialized = iota + managerInitializing + managerInitialized +) + +func newManager(numLoops int) *manager { + m := new(manager) + m.SetLoadBalance(RoundRobin) + m.SetNumLoops(numLoops) + return m +} + // LoadBalance is used to do load balancing among multiple pollers. // a single poller may not be optimal if the number of cores is large (40C+). type manager struct { - NumLoops int + numLoops int32 + status int32 // 0: uninitialized, 1: initializing, 2: initialized balance loadbalance // load balancing method polls []Poll // all the polls } // SetNumLoops will return error when set numLoops < 1 -func (m *manager) SetNumLoops(numLoops int) error { +func (m *manager) SetNumLoops(numLoops int) (err error) { if numLoops < 1 { return fmt.Errorf("set invalid numLoops[%d]", numLoops) } - - if numLoops < m.NumLoops { - // if less than, close the redundant pollers - var polls = make([]Poll, numLoops) - for idx := 0; idx < m.NumLoops; idx++ { - if idx < numLoops { - polls[idx] = m.polls[idx] - } else { - if err := m.polls[idx].Close(); err != nil { - logger.Printf("NETPOLL: poller close failed: %v\n", err) - } - } - } - m.NumLoops = numLoops - m.polls = polls - m.balance.Rebalance(m.polls) - return nil - } - - m.NumLoops = numLoops - return m.Run() + // note: set new numLoops first and then change the status + atomic.StoreInt32(&m.numLoops, int32(numLoops)) + atomic.StoreInt32(&m.status, managerUninitialized) + return nil } // SetLoadBalance set load balance. @@ -96,14 +95,14 @@ func (m *manager) SetLoadBalance(lb LoadBalance) error { } // Close release all resources. -func (m *manager) Close() error { +func (m *manager) Close() (err error) { for _, poll := range m.polls { - poll.Close() + err = poll.Close() } - m.NumLoops = 0 + m.numLoops = 0 m.balance = nil m.polls = nil - return nil + return err } // Run all pollers. @@ -114,16 +113,34 @@ func (m *manager) Run() (err error) { } }() - // new poll to fill delta. - for idx := len(m.polls); idx < m.NumLoops; idx++ { - var poll Poll - poll, err = openPoll() - if err != nil { - return + numLoops := int(atomic.LoadInt32(&m.numLoops)) + if numLoops == len(m.polls) { + return nil + } + var polls = make([]Poll, numLoops) + if numLoops < len(m.polls) { + // shrink polls + copy(polls, m.polls[:numLoops]) + for idx := numLoops; idx < len(m.polls); idx++ { + // close redundant polls + if err = m.polls[idx].Close(); err != nil { + logger.Printf("NETPOLL: poller close failed: %v\n", err) + } + } + } else { + // growth polls + copy(polls, m.polls) + for idx := len(m.polls); idx < numLoops; idx++ { + var poll Poll + poll, err = openPoll() + if err != nil { + return err + } + polls[idx] = poll + go poll.Wait() } - m.polls = append(m.polls, poll) - go poll.Wait() } + m.polls = polls // LoadBalance must be set before calling Run, otherwise it will panic. m.balance.Rebalance(m.polls) @@ -141,5 +158,24 @@ func (m *manager) Reset() error { // Pick will select the poller for use each time based on the LoadBalance. func (m *manager) Pick() Poll { +START: + // fast path + if atomic.LoadInt32(&m.status) == managerInitialized { + return m.balance.Pick() + } + // slow path + // try to get initializing lock failed, wait others finished the init work, and try again + if !atomic.CompareAndSwapInt32(&m.status, managerUninitialized, managerInitializing) { + runtime.Gosched() + goto START + } + // adjust polls + // m.Run() will finish very quickly, so will not many goroutines block on Pick. + _ = m.Run() + + if !atomic.CompareAndSwapInt32(&m.status, managerInitializing, managerInitialized) { + // SetNumLoops called during m.Run() which cause CAS failed + // The polls will be adjusted next Pick + } return m.balance.Pick() } diff --git a/poll_manager_test.go b/poll_manager_test.go index f79f3003..63559051 100644 --- a/poll_manager_test.go +++ b/poll_manager_test.go @@ -18,6 +18,8 @@ package netpoll import ( + "runtime" + "sync" "testing" "time" ) @@ -45,9 +47,47 @@ func TestPollManager(t *testing.T) { } func TestPollManagerReset(t *testing.T) { - n := pollmanager.NumLoops + n := pollmanager.numLoops err := pollmanager.Reset() MustNil(t, err) - Equal(t, len(pollmanager.polls), n) - Equal(t, pollmanager.NumLoops, n) + Equal(t, len(pollmanager.polls), int(n)) +} + +func TestPollManagerSetNumLoops(t *testing.T) { + pm := newManager(1) + + startGs := runtime.NumGoroutine() + poll := pm.Pick() + newGs := runtime.NumGoroutine() + Assert(t, poll != nil) + Assert(t, newGs-startGs == 1, newGs, startGs) + t.Logf("old=%d, new=%d", startGs, newGs) + + // change pollers + oldGs := newGs + err := pm.SetNumLoops(100) + MustNil(t, err) + newGs = runtime.NumGoroutine() + t.Logf("old=%d, new=%d", oldGs, newGs) + Assert(t, newGs == oldGs) + + // trigger polls adjustment + var wg sync.WaitGroup + finish := make(chan struct{}) + oldGs = startGs + 32 // 32 self goroutines + for i := 0; i < 32; i++ { + wg.Add(1) + go func() { + poll := pm.Pick() + newGs := runtime.NumGoroutine() + t.Logf("old=%d, new=%d", oldGs, newGs) + Assert(t, poll != nil) + Assert(t, newGs-oldGs == 100) + Assert(t, len(pm.polls) == 100) + wg.Done() + <-finish // hold goroutines + }() + } + wg.Wait() + close(finish) }