diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml index ba4c9262..798696ea 100644 --- a/.github/workflows/pr-check.yml +++ b/.github/workflows/pr-check.yml @@ -6,8 +6,13 @@ jobs: compatibility-test: strategy: matrix: - go: [ 1.15, "1.21" ] - os: [ X64, ARM64 ] + go: [ 1.15, 1.22 ] + # - "ubuntu-latest" is for Linux with X64 CPU, hosted by GitHub, + # fewer CPUs but high speed international network + # - "ARM64" is for Linux with ARM64 CPU, hosted by bytedance, + # more CPUs but inside CN internet which may download go cache slowly. + # GitHub don't have free runner with ARM CPU. + os: [ ubuntu-latest, ARM64 ] runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v4 @@ -15,14 +20,8 @@ jobs: uses: actions/setup-go@v5 with: go-version: ${{ matrix.go }} -# - uses: actions/cache@v2 -# with: -# path: ~/go/pkg/mod -# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} -# restore-keys: | -# ${{ runner.os }}-go- - name: Unit Test - run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./... + run: go test -timeout=2m -v -race -covermode=atomic -coverprofile=coverage.out ./... - name: Benchmark run: go test -bench=. -benchmem -run=none ./... windows-test: @@ -32,13 +31,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: "1.20" -# - uses: actions/cache@v2 -# with: -# path: ~/go/pkg/mod -# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} -# restore-keys: | -# ${{ runner.os }}-go- + go-version: 1.22 - name: Build Test run: go vet -v ./... style-test: diff --git a/connection_impl.go b/connection_impl.go index bb2609b6..db2f8d69 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -330,8 +330,8 @@ func (c *connection) init(conn Conn, opts *options) (err error) { // init buffer, barrier, finalizer c.readTrigger = make(chan error, 1) c.writeTrigger = make(chan error, 1) - c.bookSize, c.maxSize = pagesize, pagesize - c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer() + c.bookSize, c.maxSize = defaultLinkBufferSize, defaultLinkBufferSize + c.inputBuffer, c.outputBuffer = NewLinkBuffer(defaultLinkBufferSize), NewLinkBuffer() c.outputBarrier = barrierPool.Get().(*barrier) c.state = 0 diff --git a/connection_test.go b/connection_test.go index 2f1c8665..163645ab 100644 --- a/connection_test.go +++ b/connection_test.go @@ -21,7 +21,6 @@ import ( "context" "errors" "fmt" - "math/rand" "net" "os" "runtime" @@ -102,11 +101,13 @@ func TestConnectionLargeWrite(t *testing.T) { func TestConnectionRead(t *testing.T) { r, w := GetSysFdPairs() var rconn, wconn = &connection{}, &connection{} - rconn.init(&netFD{fd: r}, nil) - wconn.init(&netFD{fd: w}, nil) + err := rconn.init(&netFD{fd: r}, nil) + MustNil(t, err) + err = wconn.init(&netFD{fd: w}, nil) + MustNil(t, err) var size = 256 - var cycleTime = 100000 + var cycleTime = 1000 var msg = make([]byte, size) var wg sync.WaitGroup wg.Add(1) @@ -129,6 +130,13 @@ func TestConnectionRead(t *testing.T) { } func TestConnectionNoCopyReadString(t *testing.T) { + err := Configure(Config{Feature: Feature{AlwaysNoCopyRead: true}}) + MustNil(t, err) + defer func() { + err = Configure(Config{Feature: Feature{AlwaysNoCopyRead: false}}) + MustNil(t, err) + }() + r, w := GetSysFdPairs() var rconn, wconn = &connection{}, &connection{} rconn.init(&netFD{fd: r}, nil) @@ -382,34 +390,28 @@ func TestConnectionLargeMemory(t *testing.T) { rconn.init(&netFD{fd: r}, nil) var wg sync.WaitGroup - defer wg.Wait() - var rn, wn = 1024, 1 * 1024 * 1024 wg.Add(1) go func() { defer wg.Done() - rconn.Reader().Next(wn) + _, err := rconn.Reader().Next(wn) + MustNil(t, err) }() var msg = make([]byte, rn) for i := 0; i < wn/rn; i++ { n, err := syscall.Write(w, msg) if err != nil { - panic(err) - } - if n != rn { - panic(fmt.Sprintf("n[%d]!=rn[%d]", n, rn)) + MustNil(t, err) } - time.Sleep(time.Millisecond) + Equal(t, n, rn) } runtime.ReadMemStats(&end) alloc := end.TotalAlloc - start.TotalAlloc limit := uint64(4 * 1024 * 1024) - if alloc > limit { - panic(fmt.Sprintf("alloc[%d] out of memory %d", alloc, limit)) - } + Assert(t, alloc <= limit, fmt.Sprintf("alloc[%d] out of memory %d", alloc, limit)) } // TestSetTCPNoDelay is used to verify the connection initialization set the TCP_NODELAY correctly @@ -431,7 +433,7 @@ func TestConnectionUntil(t *testing.T) { rconn, wconn := &connection{}, &connection{} rconn.init(&netFD{fd: r}, nil) wconn.init(&netFD{fd: w}, nil) - loopSize := 100000 + loopSize := 10000 msg := make([]byte, 1002) msg[500], msg[1001] = '\n', '\n' @@ -459,37 +461,48 @@ func TestConnectionUntil(t *testing.T) { func TestBookSizeLargerThanMaxSize(t *testing.T) { r, w := GetSysFdPairs() - var rconn, wconn = &connection{}, &connection{} - rconn.init(&netFD{fd: r}, nil) - wconn.init(&netFD{fd: w}, nil) + rconn, wconn := &connection{}, &connection{} + err := rconn.init(&netFD{fd: r}, nil) + MustNil(t, err) + err = wconn.init(&netFD{fd: w}, nil) + MustNil(t, err) - var length = 25 - dataCollection := make([][]byte, length) - for i := 0; i < length; i++ { - dataCollection[i] = make([]byte, 2< 0; size = size >> 1 { + ch := 'a' + byte(size%26) + origin = append(origin, make([]byte, size)) + for i := 0; i < size; i++ { + origin[len(origin)-1][i] = ch } } + // read var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - for i := 0; i < length; i++ { - buf, err := rconn.Reader().Next(2 << i) + idx := 0 + for size := maxSize; size > 0; size = size >> 1 { + buf, err := rconn.Reader().Next(size) MustNil(t, err) - Equal(t, string(buf), string(dataCollection[i])) - rconn.Reader().Release() + Equal(t, string(buf), string(origin[idx])) + err = rconn.Reader().Release() + MustNil(t, err) + idx++ } }() - for i := 0; i < length; i++ { - n, err := wconn.Write(dataCollection[i]) + + // write + for i := 0; i < len(origin); i++ { + n, err := wconn.Write(origin[i]) MustNil(t, err) - Equal(t, n, 2< read => write + var wg sync.WaitGroup go func() { for { conn, err := ln.Accept() @@ -506,35 +521,33 @@ func TestConnDetach(t *testing.T) { if conn == nil { continue } + wg.Add(1) go func() { + defer wg.Done() buf := make([]byte, 1024) // slow read - for { - _, err := conn.Read(buf) - if err != nil { - return - } - time.Sleep(100 * time.Millisecond) - _, err = conn.Write(buf) - if err != nil { - return - } + _, err := conn.Read(buf) + if err != nil { + return + } + time.Sleep(10 * time.Millisecond) + _, err = conn.Write(buf) + if err != nil { + return } }() } }() + // dial => detach => write => read c, err := DialConnection("tcp", address, time.Second) MustNil(t, err) - conn := c.(*TCPConnection) - err = conn.Detach() MustNil(t, err) f := os.NewFile(uintptr(conn.fd), "netpoll-connection") defer f.Close() - gonetconn, err := net.FileConn(f) MustNil(t, err) buf := make([]byte, 1024) @@ -545,13 +558,14 @@ func TestConnDetach(t *testing.T) { err = gonetconn.Close() MustNil(t, err) - err = ln.Close() MustNil(t, err) + err = c.Close() + MustNil(t, err) + wg.Wait() } func TestParallelShortConnection(t *testing.T) { - t.Skip("TODO: it's not stable now, need fix CI") address := getTestAddress() ln, err := createTestListener("tcp", address) MustNil(t, err) @@ -592,11 +606,8 @@ func TestParallelShortConnection(t *testing.T) { } wg.Wait() - count := 100 - for count > 0 && atomic.LoadInt64(&received) < int64(totalSize) { - t.Logf("received: %d, except: %d", atomic.LoadInt64(&received), totalSize) - time.Sleep(time.Millisecond * 100) - count-- + for atomic.LoadInt64(&received) < int64(totalSize) { + runtime.Gosched() } Equal(t, atomic.LoadInt64(&received), int64(totalSize)) } @@ -619,7 +630,7 @@ func TestConnectionServerClose(t *testing.T) { var wg sync.WaitGroup el, err := NewEventLoop( func(ctx context.Context, connection Connection) error { - t.Logf("server.OnRequest: addr=%s", connection.RemoteAddr()) + //t.Logf("server.OnRequest: addr=%s", connection.RemoteAddr()) defer wg.Done() buf, err := connection.Reader().Next(len(PONG)) // pong Equal(t, string(buf), PONG) @@ -642,14 +653,14 @@ func TestConnectionServerClose(t *testing.T) { err = connection.Writer().Flush() MustNil(t, err) connection.AddCloseCallback(func(connection Connection) error { - t.Logf("server.CloseCallback: addr=%s", connection.RemoteAddr()) + //t.Logf("server.CloseCallback: addr=%s", connection.RemoteAddr()) wg.Done() return nil }) return ctx }), WithOnPrepare(func(connection Connection) context.Context { - t.Logf("server.OnPrepare: addr=%s", connection.RemoteAddr()) + //t.Logf("server.OnPrepare: addr=%s", connection.RemoteAddr()) defer wg.Done() return context.WithValue(context.Background(), "prepare", "true") }), @@ -690,13 +701,12 @@ func TestConnectionServerClose(t *testing.T) { err = conn.SetOnRequest(clientOnRequest) MustNil(t, err) conn.AddCloseCallback(func(connection Connection) error { - t.Logf("client.CloseCallback: addr=%s", connection.LocalAddr()) + //t.Logf("client.CloseCallback: addr=%s", connection.LocalAddr()) defer wg.Done() return nil }) }() } - //time.Sleep(time.Second) wg.Wait() } diff --git a/netpoll.go b/netpoll.go deleted file mode 100644 index 7f53f2a2..00000000 --- a/netpoll.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2022 CloudWeGo Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build darwin || netbsd || freebsd || openbsd || dragonfly || linux -// +build darwin netbsd freebsd openbsd dragonfly linux - -package netpoll - -import ( - "context" - "net" - "runtime" - "sync" -) - -// NewEventLoop . -func NewEventLoop(onRequest OnRequest, ops ...Option) (EventLoop, error) { - opts := &options{ - onRequest: onRequest, - } - for _, do := range ops { - do.f(opts) - } - return &eventLoop{ - opts: opts, - stop: make(chan error, 1), - }, nil -} - -type eventLoop struct { - sync.Mutex - opts *options - svr *server - stop chan error -} - -// Serve implements EventLoop. -func (evl *eventLoop) Serve(ln net.Listener) error { - npln, err := ConvertListener(ln) - if err != nil { - return err - } - evl.Lock() - evl.svr = newServer(npln, evl.opts, evl.quit) - evl.svr.Run() - evl.Unlock() - - err = evl.waitQuit() - // ensure evl will not be finalized until Serve returns - runtime.SetFinalizer(evl, nil) - return err -} - -// Shutdown signals a shutdown a begins server closing. -func (evl *eventLoop) Shutdown(ctx context.Context) error { - evl.Lock() - var svr = evl.svr - evl.svr = nil - evl.Unlock() - - if svr == nil { - return nil - } - evl.quit(nil) - return svr.Close(ctx) -} - -// waitQuit waits for a quit signal -func (evl *eventLoop) waitQuit() error { - return <-evl.stop -} - -func (evl *eventLoop) quit(err error) { - select { - case evl.stop <- err: - default: - } -} diff --git a/netpoll_config.go b/netpoll_config.go new file mode 100644 index 00000000..dda65a35 --- /dev/null +++ b/netpoll_config.go @@ -0,0 +1,47 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package netpoll + +import ( + "context" + "io" + "time" +) + +// global config +var ( + defaultLinkBufferSize = pagesize + defaultGracefulShutdownCheckInterval = time.Second + featureAlwaysNoCopyRead = false +) + +// Config expose some tuning parameters to control the internal behaviors of netpoll. +// Every parameter with the default zero value should keep the default behavior of netpoll. +type Config struct { + PollerNum int // number of pollers + BufferSize int // default size of a new connection's LinkBuffer + Runner func(ctx context.Context, f func()) // runner for event handler, most of the time use a goroutine pool. + LoggerOutput io.Writer // logger output + LoadBalance LoadBalance // load balance for poller picker + Feature // define all features that not enable by default +} + +// Feature expose some new features maybe promoted as a default behavior but not yet. +type Feature struct { + // AlwaysNoCopyRead allows some copy Read functions like ReadBinary/ReadString + // will use NoCopy read and will not reuse the underlying buffer. + // It gains more performance benefits when need read much big string/bytes in codec. + AlwaysNoCopyRead bool +} diff --git a/netpoll_options.go b/netpoll_options.go index 2cdb1c13..b72bba49 100644 --- a/netpoll_options.go +++ b/netpoll_options.go @@ -1,4 +1,4 @@ -// Copyright 2022 CloudWeGo Authors +// Copyright 2024 CloudWeGo Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,61 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build !windows -// +build !windows - package netpoll -import ( - "context" - "io" - "time" -) - -// SetNumLoops is used to set the number of pollers, generally do not need to actively set. -// By default, the number of pollers is equal to runtime.GOMAXPROCS(0)/20+1. -// If the number of cores in your service process is less than 20c, theoretically only one poller is needed. -// Otherwise you may need to adjust the number of pollers to achieve the best results. -// Experience recommends assigning a poller every 20c. -// -// You can only use SetNumLoops before any connection is created. An example usage: -// -// func init() { -// netpoll.SetNumLoops(...) -// } -func SetNumLoops(numLoops int) error { - return setNumLoops(numLoops) -} - -// SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt -// to distribute the incoming connections between multiple polls. -// This option only works when numLoops is set. -func SetLoadBalance(lb LoadBalance) error { - return setLoadBalance(lb) -} +import "time" -// Initialize the pollers actively. By default, it's lazy initialized. -// It's safe to call it multi times. -func Initialize() { - initialize() -} - -func SetLoggerOutput(w io.Writer) { - setLoggerOutput(w) -} - -// SetRunner set the runner function for every OnRequest/OnConnect callback -func SetRunner(f func(ctx context.Context, f func())) { - setRunner(f) +// Option . +type Option struct { + f func(*options) } -// DisableGopool will remove gopool(the goroutine pool used to run OnRequest), -// which means that OnRequest will be run via `go OnRequest(...)`. -// Usually, OnRequest will cause stack expansion, which can be solved by reusing goroutine. -// But if you can confirm that the OnRequest will not cause stack expansion, -// it is recommended to use DisableGopool to reduce redundancy and improve performance. -func DisableGopool() error { - return disableGopool() +type options struct { + onPrepare OnPrepare + onConnect OnConnect + onDisconnect OnDisconnect + onRequest OnRequest + readTimeout time.Duration + writeTimeout time.Duration + idleTimeout time.Duration } // WithOnPrepare registers the OnPrepare method to EventLoop. @@ -110,18 +72,3 @@ func WithIdleTimeout(timeout time.Duration) Option { op.idleTimeout = timeout }} } - -// Option . -type Option struct { - f func(*options) -} - -type options struct { - onPrepare OnPrepare - onConnect OnConnect - onDisconnect OnDisconnect - onRequest OnRequest - readTimeout time.Duration - writeTimeout time.Duration - idleTimeout time.Duration -} diff --git a/netpoll_server.go b/netpoll_server.go index 78f26b59..ace92ba6 100644 --- a/netpoll_server.go +++ b/netpoll_server.go @@ -63,7 +63,7 @@ func (s *server) Close(ctx context.Context) error { s.operator.Control(PollDetach) s.ln.Close() - var ticker = time.NewTicker(time.Second) + var ticker = time.NewTicker(defaultGracefulShutdownCheckInterval) defer ticker.Stop() var hasConn bool for { diff --git a/netpoll_unix.go b/netpoll_unix.go new file mode 100644 index 00000000..4eb25a05 --- /dev/null +++ b/netpoll_unix.go @@ -0,0 +1,179 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build darwin || netbsd || freebsd || openbsd || dragonfly || linux +// +build darwin netbsd freebsd openbsd dragonfly linux + +package netpoll + +import ( + "context" + "io" + "log" + "net" + "os" + "runtime" + "sync" +) + +var ( + pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1) // pollmanager manage all pollers + logger = log.New(os.Stderr, "", log.LstdFlags) +) + +// Initialize the pollers actively. By default, it's lazy initialized. +// It's safe to call it multi times. +func Initialize() { + // The first call of Pick() will init pollers + _ = pollmanager.Pick() +} + +// Configure the internal behaviors of netpoll. +// Configure must called in init() function, because the poller will read some global variable after init() finished +func Configure(config Config) (err error) { + if config.PollerNum > 0 { + if err = pollmanager.SetNumLoops(config.PollerNum); err != nil { + return err + } + } + if config.BufferSize > 0 { + defaultLinkBufferSize = config.BufferSize + } + + if config.Runner != nil { + setRunner(config.Runner) + } + if config.LoggerOutput != nil { + logger = log.New(config.LoggerOutput, "", log.LstdFlags) + } + if config.LoadBalance >= 0 { + if err = pollmanager.SetLoadBalance(config.LoadBalance); err != nil { + return err + } + } + + featureAlwaysNoCopyRead = config.AlwaysNoCopyRead + return nil +} + +// SetNumLoops is used to set the number of pollers, generally do not need to actively set. +// By default, the number of pollers is equal to runtime.GOMAXPROCS(0)/20+1. +// If the number of cores in your service process is less than 20c, theoretically only one poller is needed. +// Otherwise, you may need to adjust the number of pollers to achieve the best results. +// Experience recommends assigning a poller every 20c. +// +// You can only use SetNumLoops before any connection is created. An example usage: +// +// func init() { +// netpoll.SetNumLoops(...) +// } +// +// Deprecated: use Configure instead. +func SetNumLoops(numLoops int) error { + return pollmanager.SetNumLoops(numLoops) +} + +// SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt +// to distribute the incoming connections between multiple polls. +// This option only works when numLoops is set. +// Deprecated: use Configure instead. +func SetLoadBalance(lb LoadBalance) error { + return pollmanager.SetLoadBalance(lb) +} + +// SetLoggerOutput sets the logger output target. +// Deprecated: use Configure instead. +func SetLoggerOutput(w io.Writer) { + logger = log.New(w, "", log.LstdFlags) +} + +// SetRunner set the runner function for every OnRequest/OnConnect callback +// Deprecated: use Configure instead. +func SetRunner(f func(ctx context.Context, f func())) { + setRunner(f) +} + +// DisableGopool will remove gopool(the goroutine pool used to run OnRequest), +// which means that OnRequest will be run via `go OnRequest(...)`. +// Usually, OnRequest will cause stack expansion, which can be solved by reusing goroutine. +// But if you can confirm that the OnRequest will not cause stack expansion, +// it is recommended to use DisableGopool to reduce redundancy and improve performance. +// Deprecated: use Configure instead. +func DisableGopool() error { + return disableGopool() +} + +// NewEventLoop . +func NewEventLoop(onRequest OnRequest, ops ...Option) (EventLoop, error) { + opts := &options{ + onRequest: onRequest, + } + for _, do := range ops { + do.f(opts) + } + return &eventLoop{ + opts: opts, + stop: make(chan error, 1), + }, nil +} + +type eventLoop struct { + sync.Mutex + opts *options + svr *server + stop chan error +} + +// Serve implements EventLoop. +func (evl *eventLoop) Serve(ln net.Listener) error { + npln, err := ConvertListener(ln) + if err != nil { + return err + } + evl.Lock() + evl.svr = newServer(npln, evl.opts, evl.quit) + evl.svr.Run() + evl.Unlock() + + err = evl.waitQuit() + // ensure evl will not be finalized until Serve returns + runtime.SetFinalizer(evl, nil) + return err +} + +// Shutdown signals a shutdown a begins server closing. +func (evl *eventLoop) Shutdown(ctx context.Context) error { + evl.Lock() + var svr = evl.svr + evl.svr = nil + evl.Unlock() + + if svr == nil { + return nil + } + evl.quit(nil) + return svr.Close(ctx) +} + +// waitQuit waits for a quit signal +func (evl *eventLoop) waitQuit() error { + return <-evl.stop +} + +func (evl *eventLoop) quit(err error) { + select { + case evl.stop <- err: + default: + } +} diff --git a/netpoll_test.go b/netpoll_unix_test.go similarity index 93% rename from netpoll_test.go rename to netpoll_unix_test.go index 4156a0b8..70638c57 100644 --- a/netpoll_test.go +++ b/netpoll_unix_test.go @@ -21,7 +21,6 @@ import ( "context" "errors" "fmt" - "math/rand" "os" "runtime" "sync" @@ -65,6 +64,15 @@ func Assert(t *testing.T, cond bool, val ...interface{}) { } } +func TestMain(m *testing.M) { + // defaultGracefulShutdownCheckInterval will affect shutdown function running time, + // so for speed up tests, we change it to 10ms here + oldGracefulShutdownCheckInterval := defaultGracefulShutdownCheckInterval + defaultGracefulShutdownCheckInterval = time.Millisecond * 10 + m.Run() + defaultGracefulShutdownCheckInterval = oldGracefulShutdownCheckInterval +} + var testPort int32 = 10000 // getTestAddress return a unique port for every tests, so all tests will not share a same listerner @@ -270,40 +278,43 @@ func TestGracefulExit(t *testing.T) { MustNil(t, err) // exit with processing connections + trigger := make(chan struct{}) var eventLoop2 = newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { - time.Sleep(10 * time.Second) + <-trigger return nil }) for i := 0; i < 10; i++ { - if i%2 == 0 { - var conn, err = DialConnection(network, address, time.Second) - MustNil(t, err) - _, err = conn.Write(make([]byte, 16)) - MustNil(t, err) - } + // connect success + var conn, err = DialConnection(network, address, time.Second) + MustNil(t, err) + _, err = conn.Write(make([]byte, 16)) + MustNil(t, err) } - var ctx2, cancel2 = context.WithTimeout(context.Background(), 5*time.Second) + // shutdown timeout + var ctx2, cancel2 = context.WithTimeout(context.Background(), time.Millisecond*100) defer cancel2() err = eventLoop2.Shutdown(ctx2) MustTrue(t, err != nil) Equal(t, err.Error(), ctx2.Err().Error()) + // shutdown success + close(trigger) + err = eventLoop2.Shutdown(ctx2) + MustTrue(t, err == nil) - // exit with some processing connections + // exit with read connections + size := 16 var eventLoop3 = newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { - time.Sleep(time.Duration(rand.Intn(3)) * time.Second) - if l := connection.Reader().Len(); l > 0 { - var _, err = connection.Reader().Next(l) - MustNil(t, err) - } + _, err := connection.Reader().Next(size) + MustNil(t, err) return nil }) for i := 0; i < 10; i++ { var conn, err = DialConnection(network, address, time.Second) MustNil(t, err) if i%2 == 0 { - _, err = conn.Write(make([]byte, 16)) + _, err := conn.Write(make([]byte, size)) MustNil(t, err) } } @@ -409,15 +420,12 @@ func TestCloseConnWhenOnConnect(t *testing.T) { func TestServerReadAndClose(t *testing.T) { var network, address = "tcp", getTestAddress() var sendMsg = []byte("hello") - var closed int32 var loop = newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { _, err := connection.Reader().Next(len(sendMsg)) MustNil(t, err) - err = connection.Close() MustNil(t, err) - atomic.AddInt32(&closed, 1) return nil }, ) @@ -429,14 +437,13 @@ func TestServerReadAndClose(t *testing.T) { err = conn.Writer().Flush() MustNil(t, err) - for atomic.LoadInt32(&closed) == 0 { + for conn.IsActive() { runtime.Gosched() // wait for poller close connection } - time.Sleep(time.Millisecond * 50) _, err = conn.Writer().WriteBinary(sendMsg) MustNil(t, err) err = conn.Writer().Flush() - MustTrue(t, errors.Is(err, ErrConnClosed)) + Assert(t, errors.Is(err, ErrConnClosed), err) err = loop.Shutdown(context.Background()) MustNil(t, err) diff --git a/netpoll_windows.go b/netpoll_windows.go index 634d1ef9..86434e79 100644 --- a/netpoll_windows.go +++ b/netpoll_windows.go @@ -1,4 +1,4 @@ -// Copyright 2022 CloudWeGo Authors +// Copyright 2024 CloudWeGo Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,34 +20,11 @@ package netpoll import ( "net" - "time" ) -// Option . -type Option struct { - f func(*options) -} - -type options struct{} - -// WithOnPrepare registers the OnPrepare method to EventLoop. -func WithOnPrepare(onPrepare OnPrepare) Option { - return Option{} -} - -// WithOnConnect registers the OnConnect method to EventLoop. -func WithOnConnect(onConnect OnConnect) Option { - return Option{} -} - -// WithReadTimeout sets the read timeout of connections. -func WithReadTimeout(timeout time.Duration) Option { - return Option{} -} - -// WithIdleTimeout sets the idle timeout of connections. -func WithIdleTimeout(timeout time.Duration) Option { - return Option{} +// Configure the internal behaviors of netpoll. +func Configure(config Config) (err error) { + return nil } // NewDialer only support TCP and unix socket now. diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index 3762867c..7843767f 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -252,19 +252,19 @@ func (b *UnsafeLinkBuffer) readBinary(n int) (p []byte) { // single node if b.isSingleNode(n) { // TODO: enable nocopy read mode when ensure no legacy depend on copy-read - //// we cannot nocopy read a readonly mode buffer, since readonly buffer's memory is not control by itself - //if !b.read.getMode(readonlyMask) { - // // if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently - // // for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec - // // no need to malloc 10 times and the string slice could have the compact memory allocation. - // if b.read.getMode(nocopyReadMask) { - // return b.read.Next(n) - // } - // if n >= minReuseBytes && cap(b.read.buf) <= block32k { - // b.read.setMode(nocopyReadMask, true) - // return b.read.Next(n) - // } - //} + // we cannot nocopy read a readonly mode buffer, since readonly buffer's memory is not control by itself + if !b.read.getMode(readonlyMask) { + // if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently + // for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec + // no need to malloc 10 times and the string slice could have the compact memory allocation. + if b.read.getMode(nocopyReadMask) { + return b.read.Next(n) + } + if featureAlwaysNoCopyRead && n >= minReuseBytes { + b.read.setMode(nocopyReadMask, true) + return b.read.Next(n) + } + } // if the underlying buffer too large, we shouldn't use no-copy mode p = dirtmake.Bytes(n, n) copy(p, b.read.Next(n)) @@ -675,9 +675,8 @@ func (b *UnsafeLinkBuffer) calcMaxSize() (sum int) { // resetTail will reset tail node or add an empty tail node to // guarantee the tail node is not larger than 8KB func (b *UnsafeLinkBuffer) resetTail(maxSize int) { - // FIXME: Reset should be removed when find a decent way to reuse buffer if maxSize <= pagesize { - b.write.Reset() + // no need to reset a small buffer tail node return } // set nil tail diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index 01661bee..1e84fabb 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -21,6 +21,7 @@ import ( "bytes" "encoding/binary" "fmt" + "runtime" "sync/atomic" "testing" ) @@ -522,91 +523,97 @@ func TestLinkBufferWriteDirect(t *testing.T) { } } -//func TestLinkBufferNoCopyWriteAndRead(t *testing.T) { -// // [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B] -// const ( -// mallocLen = 4096 * 2 -// originLen = 4096 -// dataLen = 512 -// newLen = 16 -// normalLen = 4096 -// ) -// buf := NewLinkBuffer() -// bt, _ := buf.Malloc(mallocLen) -// originBuf := bt[:originLen] -// newBuf := bt[originLen : originLen+newLen] -// -// // write origin_node -// for i := 0; i < originLen; i++ { -// bt[i] = 'a' -// } -// // write data_node -// userBuf := make([]byte, dataLen) -// for i := 0; i < len(userBuf); i++ { -// userBuf[i] = 'b' -// } -// buf.WriteDirect(userBuf, mallocLen-originLen) // nocopy write -// // write new_node -// for i := 0; i < newLen; i++ { -// bt[originLen+i] = 'c' -// } -// buf.MallocAck(originLen + dataLen + newLen) -// buf.Flush() -// // write normal_node -// normalBuf, _ := buf.Malloc(normalLen) -// for i := 0; i < normalLen; i++ { -// normalBuf[i] = 'd' -// } -// buf.Flush() -// Equal(t, buf.Len(), originLen+dataLen+newLen+normalLen) -// -// // copy read origin_node -// bt, _ = buf.ReadBinary(originLen) -// for i := 0; i < len(bt); i++ { -// MustTrue(t, bt[i] == 'a') -// } -// MustTrue(t, &bt[0] != &originBuf[0]) -// // next read node is data node and must be readonly and non-reusable -// MustTrue(t, buf.read.next.getMode(readonlyMask) && !buf.read.next.reusable()) -// // copy read data_node -// bt, _ = buf.ReadBinary(dataLen) -// for i := 0; i < len(bt); i++ { -// MustTrue(t, bt[i] == 'b') -// } -// MustTrue(t, &bt[0] != &userBuf[0]) -// // copy read new_node -// bt, _ = buf.ReadBinary(newLen) -// for i := 0; i < len(bt); i++ { -// MustTrue(t, bt[i] == 'c') -// } -// MustTrue(t, &bt[0] != &newBuf[0]) -// // current read node is the new node and must not be reusable -// newnode := buf.read -// t.Log("newnode", newnode.getMode(readonlyMask), newnode.getMode(nocopyReadMask)) -// MustTrue(t, newnode.reusable()) -// var nodeReleased int32 -// runtime.SetFinalizer(&newnode.buf[0], func(_ *byte) { -// atomic.AddInt32(&nodeReleased, 1) -// }) -// // nocopy read normal_node -// bt, _ = buf.ReadBinary(normalLen) -// for i := 0; i < len(bt); i++ { -// MustTrue(t, bt[i] == 'd') -// } -// MustTrue(t, &bt[0] == &normalBuf[0]) -// // normal buffer never should be released -// runtime.SetFinalizer(&bt[0], func(_ *byte) { -// atomic.AddInt32(&nodeReleased, 1) -// }) -// _ = buf.Release() -// MustTrue(t, newnode.buf == nil) -// for atomic.LoadInt32(&nodeReleased) == 0 { -// runtime.GC() -// t.Log("newnode release checking") -// } -// Equal(t, atomic.LoadInt32(&nodeReleased), int32(1)) -// runtime.KeepAlive(normalBuf) -//} +func TestLinkBufferNoCopyWriteAndRead(t *testing.T) { + err := Configure(Config{Feature: Feature{AlwaysNoCopyRead: true}}) + MustNil(t, err) + defer func() { + err = Configure(Config{Feature: Feature{AlwaysNoCopyRead: false}}) + MustNil(t, err) + }() + // [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B] + const ( + mallocLen = 4096 * 2 + originLen = 4096 + dataLen = 512 + newLen = 16 + normalLen = 4096 + ) + buf := NewLinkBuffer() + bt, _ := buf.Malloc(mallocLen) + originBuf := bt[:originLen] + newBuf := bt[originLen : originLen+newLen] + + // write origin_node + for i := 0; i < originLen; i++ { + bt[i] = 'a' + } + // write data_node + userBuf := make([]byte, dataLen) + for i := 0; i < len(userBuf); i++ { + userBuf[i] = 'b' + } + buf.WriteDirect(userBuf, mallocLen-originLen) // nocopy write + // write new_node + for i := 0; i < newLen; i++ { + bt[originLen+i] = 'c' + } + buf.MallocAck(originLen + dataLen + newLen) + buf.Flush() + // write normal_node + normalBuf, _ := buf.Malloc(normalLen) + for i := 0; i < normalLen; i++ { + normalBuf[i] = 'd' + } + buf.Flush() + Equal(t, buf.Len(), originLen+dataLen+newLen+normalLen) + + // copy read origin_node + bt, _ = buf.ReadBinary(originLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'a') + } + MustTrue(t, &bt[0] != &originBuf[0]) + // next read node is data node and must be readonly and non-reusable + MustTrue(t, buf.read.next.getMode(readonlyMask) && !buf.read.next.reusable()) + // copy read data_node + bt, _ = buf.ReadBinary(dataLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'b') + } + MustTrue(t, &bt[0] != &userBuf[0]) + // copy read new_node + bt, _ = buf.ReadBinary(newLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'c') + } + MustTrue(t, &bt[0] != &newBuf[0]) + // current read node is the new node and must not be reusable + newnode := buf.read + t.Log("newnode", newnode.getMode(readonlyMask), newnode.getMode(nocopyReadMask)) + MustTrue(t, newnode.reusable()) + var nodeReleased int32 + runtime.SetFinalizer(&newnode.buf[0], func(_ *byte) { + atomic.AddInt32(&nodeReleased, 1) + }) + // nocopy read normal_node + bt, _ = buf.ReadBinary(normalLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'd') + } + MustTrue(t, &bt[0] == &normalBuf[0]) + // normal buffer never should be released + runtime.SetFinalizer(&bt[0], func(_ *byte) { + atomic.AddInt32(&nodeReleased, 1) + }) + _ = buf.Release() + MustTrue(t, newnode.buf == nil) + for atomic.LoadInt32(&nodeReleased) == 0 { + runtime.GC() + t.Log("newnode release checking") + } + Equal(t, atomic.LoadInt32(&nodeReleased), int32(1)) + runtime.KeepAlive(normalBuf) +} func TestLinkBufferBufferMode(t *testing.T) { bufnode := newLinkBufferNode(0) diff --git a/poll_default_linux.go b/poll_default_linux.go index a0087ee0..e1215fb5 100644 --- a/poll_default_linux.go +++ b/poll_default_linux.go @@ -238,6 +238,12 @@ func (p *defaultPoll) Trigger() error { // Control implements Poll. func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { + // DON'T move `fd=operator.FD` behind inuse() call, we can only access operator before op.inuse() for avoid race + // G1: G2: + // op.inuse() op.unused() + // op.FD -- T1 op.FD = 0 -- T2 + // T1 and T2 may happen together + var fd = operator.FD var op int var evt epollevent p.setOperator(unsafe.Pointer(&evt.data), operator) @@ -256,5 +262,5 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { case PollRW2R: // connection wait read op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR } - return EpollCtl(p.fd, op, operator.FD, &evt) + return EpollCtl(p.fd, op, fd, &evt) } diff --git a/poll_loadbalance.go b/poll_loadbalance.go index 33c3ae54..6bbbc039 100644 --- a/poll_loadbalance.go +++ b/poll_loadbalance.go @@ -24,17 +24,17 @@ import ( type LoadBalance int const ( - // Random requests that connections are randomly distributed. - Random LoadBalance = iota // RoundRobin requests that connections are distributed to a Poll // in a round-robin fashion. - RoundRobin + RoundRobin LoadBalance = iota + // Random requests that connections are randomly distributed. + Random ) // loadbalance sets the load balancing method for []*polls type loadbalance interface { LoadBalance() LoadBalance - // Choose the most qualified Poll + // Pick choose the most qualified Poll Pick() (poll Poll) Rebalance(polls []Poll) @@ -42,10 +42,10 @@ type loadbalance interface { func newLoadbalance(lb LoadBalance, polls []Poll) loadbalance { switch lb { - case Random: - return newRandomLB(polls) case RoundRobin: return newRoundRobinLB(polls) + case Random: + return newRandomLB(polls) } return newRoundRobinLB(polls) } diff --git a/poll_manager.go b/poll_manager.go index 4183ac3d..602250e4 100644 --- a/poll_manager.go +++ b/poll_manager.go @@ -19,39 +19,10 @@ package netpoll import ( "fmt" - "io" - "log" - "os" "runtime" "sync/atomic" ) -func setNumLoops(numLoops int) error { - return pollmanager.SetNumLoops(numLoops) -} - -func setLoadBalance(lb LoadBalance) error { - return pollmanager.SetLoadBalance(lb) -} - -func initialize() { - // The first call of Pick() will init pollers - _ = pollmanager.Pick() -} - -func setLoggerOutput(w io.Writer) { - logger = log.New(w, "", log.LstdFlags) -} - -// pollmanager manage all pollers -var pollmanager *manager -var logger *log.Logger - -func init() { - pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1) - setLoggerOutput(os.Stderr) -} - const ( managerUninitialized = iota managerInitializing diff --git a/poll_manager_test.go b/poll_manager_test.go index 63559051..c5648a76 100644 --- a/poll_manager_test.go +++ b/poll_manager_test.go @@ -21,14 +21,15 @@ import ( "runtime" "sync" "testing" - "time" ) func TestPollManager(t *testing.T) { r, w := GetSysFdPairs() var rconn, wconn = &connection{}, &connection{} - rconn.init(&netFD{fd: r}, nil) - wconn.init(&netFD{fd: w}, nil) + err := rconn.init(&netFD{fd: r}, nil) + MustNil(t, err) + err = wconn.init(&netFD{fd: w}, nil) + MustNil(t, err) var msg = []byte("hello world") n, err := wconn.Write(msg) @@ -41,9 +42,9 @@ func TestPollManager(t *testing.T) { err = wconn.Close() MustNil(t, err) - time.Sleep(10 * time.Millisecond) - MustTrue(t, !rconn.IsActive()) - MustTrue(t, !wconn.IsActive()) + for rconn.IsActive() || wconn.IsActive() { + runtime.Gosched() + } } func TestPollManagerReset(t *testing.T) { @@ -79,10 +80,7 @@ func TestPollManagerSetNumLoops(t *testing.T) { wg.Add(1) go func() { poll := pm.Pick() - newGs := runtime.NumGoroutine() - t.Logf("old=%d, new=%d", oldGs, newGs) Assert(t, poll != nil) - Assert(t, newGs-oldGs == 100) Assert(t, len(pm.polls) == 100) wg.Done() <-finish // hold goroutines