From eeceef679c413cf0d7671707ad29076a34568c6c Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Mon, 22 Jul 2024 17:47:46 +0800 Subject: [PATCH 1/3] feat: expose config and features control --- connection_impl.go | 4 +- netpoll_options.go | 85 ++++++++++++++++-- nocopy_linkbuffer.go | 26 +++--- nocopy_linkbuffer_test.go | 177 ++++++++++++++++++++------------------ poll_loadbalance.go | 12 +-- poll_manager.go | 29 ------- 6 files changed, 189 insertions(+), 144 deletions(-) diff --git a/connection_impl.go b/connection_impl.go index bb2609b6..db2f8d69 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -330,8 +330,8 @@ func (c *connection) init(conn Conn, opts *options) (err error) { // init buffer, barrier, finalizer c.readTrigger = make(chan error, 1) c.writeTrigger = make(chan error, 1) - c.bookSize, c.maxSize = pagesize, pagesize - c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer() + c.bookSize, c.maxSize = defaultLinkBufferSize, defaultLinkBufferSize + c.inputBuffer, c.outputBuffer = NewLinkBuffer(defaultLinkBufferSize), NewLinkBuffer() c.outputBarrier = barrierPool.Get().(*barrier) c.state = 0 diff --git a/netpoll_options.go b/netpoll_options.go index 2cdb1c13..0540ddf9 100644 --- a/netpoll_options.go +++ b/netpoll_options.go @@ -20,9 +20,75 @@ package netpoll import ( "context" "io" + "log" + "os" + "runtime" "time" ) +var ( + pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1) // pollmanager manage all pollers + logger = log.New(os.Stderr, "", log.LstdFlags) + + // global config + defaultLinkBufferSize = pagesize + featureAlwaysNoCopyRead = false +) + +// Config expose some tuning parameters to control the internal behaviors of netpoll. +// Every parameter with the default zero value should keep the default behavior of netpoll. +type Config struct { + PollerNum int // number of pollers + BufferSize int // default size of a new connection's LinkBuffer + Runner func(ctx context.Context, f func()) // runner for event handler, most of the time use a goroutine pool. + LoggerOutput io.Writer // logger output + LoadBalance LoadBalance // load balance for poller picker + Feature // define all features that not enable by default +} + +// Feature expose some new features maybe promoted as a default behavior but not yet. +type Feature struct { + // AlwaysNoCopyRead allows some copy Read functions like ReadBinary/ReadString + // will use NoCopy read and will not reuse the underlying buffer. + // It gains more performance benefits when need read much big string/bytes in codec. + AlwaysNoCopyRead bool +} + +// Configure the internal behaviors of netpoll. +// Configure must called in init() function, because the poller will read some global variable after init() finished +func Configure(config Config) (err error) { + if config.PollerNum > 0 { + if err = pollmanager.SetNumLoops(config.PollerNum); err != nil { + return err + } + } + if config.BufferSize > 0 { + defaultLinkBufferSize = config.BufferSize + } + + if config.Runner != nil { + setRunner(config.Runner) + } + if config.LoggerOutput != nil { + logger = log.New(config.LoggerOutput, "", log.LstdFlags) + } + if config.LoadBalance >= 0 { + if err = pollmanager.SetLoadBalance(config.LoadBalance); err != nil { + return err + } + } + + featureAlwaysNoCopyRead = config.AlwaysNoCopyRead + return nil +} + +// Initialize the pollers actively. By default, it's lazy initialized. +// It's safe to call it multi times. +func Initialize() { + // The first call of Pick() will init pollers + _ = pollmanager.Pick() +} + // SetNumLoops is used to set the number of pollers, generally do not need to actively set. // By default, the number of pollers is equal to runtime.GOMAXPROCS(0)/20+1. // If the number of cores in your service process is less than 20c, theoretically only one poller is needed. @@ -34,28 +100,28 @@ import ( // func init() { // netpoll.SetNumLoops(...) // } +// +// Deprecated: use Configure instead. func SetNumLoops(numLoops int) error { - return setNumLoops(numLoops) + return pollmanager.SetNumLoops(numLoops) } // SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt // to distribute the incoming connections between multiple polls. // This option only works when numLoops is set. +// Deprecated: use Configure instead. func SetLoadBalance(lb LoadBalance) error { - return setLoadBalance(lb) -} - -// Initialize the pollers actively. By default, it's lazy initialized. -// It's safe to call it multi times. -func Initialize() { - initialize() + return pollmanager.SetLoadBalance(lb) } +// SetLoggerOutput sets the logger output target. +// Deprecated: use Configure instead. func SetLoggerOutput(w io.Writer) { - setLoggerOutput(w) + logger = log.New(w, "", log.LstdFlags) } // SetRunner set the runner function for every OnRequest/OnConnect callback +// Deprecated: use Configure instead. func SetRunner(f func(ctx context.Context, f func())) { setRunner(f) } @@ -65,6 +131,7 @@ func SetRunner(f func(ctx context.Context, f func())) { // Usually, OnRequest will cause stack expansion, which can be solved by reusing goroutine. // But if you can confirm that the OnRequest will not cause stack expansion, // it is recommended to use DisableGopool to reduce redundancy and improve performance. +// Deprecated: use Configure instead. func DisableGopool() error { return disableGopool() } diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index 3762867c..a56b3b64 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -252,19 +252,19 @@ func (b *UnsafeLinkBuffer) readBinary(n int) (p []byte) { // single node if b.isSingleNode(n) { // TODO: enable nocopy read mode when ensure no legacy depend on copy-read - //// we cannot nocopy read a readonly mode buffer, since readonly buffer's memory is not control by itself - //if !b.read.getMode(readonlyMask) { - // // if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently - // // for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec - // // no need to malloc 10 times and the string slice could have the compact memory allocation. - // if b.read.getMode(nocopyReadMask) { - // return b.read.Next(n) - // } - // if n >= minReuseBytes && cap(b.read.buf) <= block32k { - // b.read.setMode(nocopyReadMask, true) - // return b.read.Next(n) - // } - //} + // we cannot nocopy read a readonly mode buffer, since readonly buffer's memory is not control by itself + if !b.read.getMode(readonlyMask) { + // if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently + // for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec + // no need to malloc 10 times and the string slice could have the compact memory allocation. + if b.read.getMode(nocopyReadMask) { + return b.read.Next(n) + } + if featureAlwaysNoCopyRead && n >= minReuseBytes { + b.read.setMode(nocopyReadMask, true) + return b.read.Next(n) + } + } // if the underlying buffer too large, we shouldn't use no-copy mode p = dirtmake.Bytes(n, n) copy(p, b.read.Next(n)) diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index 01661bee..1e84fabb 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -21,6 +21,7 @@ import ( "bytes" "encoding/binary" "fmt" + "runtime" "sync/atomic" "testing" ) @@ -522,91 +523,97 @@ func TestLinkBufferWriteDirect(t *testing.T) { } } -//func TestLinkBufferNoCopyWriteAndRead(t *testing.T) { -// // [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B] -// const ( -// mallocLen = 4096 * 2 -// originLen = 4096 -// dataLen = 512 -// newLen = 16 -// normalLen = 4096 -// ) -// buf := NewLinkBuffer() -// bt, _ := buf.Malloc(mallocLen) -// originBuf := bt[:originLen] -// newBuf := bt[originLen : originLen+newLen] -// -// // write origin_node -// for i := 0; i < originLen; i++ { -// bt[i] = 'a' -// } -// // write data_node -// userBuf := make([]byte, dataLen) -// for i := 0; i < len(userBuf); i++ { -// userBuf[i] = 'b' -// } -// buf.WriteDirect(userBuf, mallocLen-originLen) // nocopy write -// // write new_node -// for i := 0; i < newLen; i++ { -// bt[originLen+i] = 'c' -// } -// buf.MallocAck(originLen + dataLen + newLen) -// buf.Flush() -// // write normal_node -// normalBuf, _ := buf.Malloc(normalLen) -// for i := 0; i < normalLen; i++ { -// normalBuf[i] = 'd' -// } -// buf.Flush() -// Equal(t, buf.Len(), originLen+dataLen+newLen+normalLen) -// -// // copy read origin_node -// bt, _ = buf.ReadBinary(originLen) -// for i := 0; i < len(bt); i++ { -// MustTrue(t, bt[i] == 'a') -// } -// MustTrue(t, &bt[0] != &originBuf[0]) -// // next read node is data node and must be readonly and non-reusable -// MustTrue(t, buf.read.next.getMode(readonlyMask) && !buf.read.next.reusable()) -// // copy read data_node -// bt, _ = buf.ReadBinary(dataLen) -// for i := 0; i < len(bt); i++ { -// MustTrue(t, bt[i] == 'b') -// } -// MustTrue(t, &bt[0] != &userBuf[0]) -// // copy read new_node -// bt, _ = buf.ReadBinary(newLen) -// for i := 0; i < len(bt); i++ { -// MustTrue(t, bt[i] == 'c') -// } -// MustTrue(t, &bt[0] != &newBuf[0]) -// // current read node is the new node and must not be reusable -// newnode := buf.read -// t.Log("newnode", newnode.getMode(readonlyMask), newnode.getMode(nocopyReadMask)) -// MustTrue(t, newnode.reusable()) -// var nodeReleased int32 -// runtime.SetFinalizer(&newnode.buf[0], func(_ *byte) { -// atomic.AddInt32(&nodeReleased, 1) -// }) -// // nocopy read normal_node -// bt, _ = buf.ReadBinary(normalLen) -// for i := 0; i < len(bt); i++ { -// MustTrue(t, bt[i] == 'd') -// } -// MustTrue(t, &bt[0] == &normalBuf[0]) -// // normal buffer never should be released -// runtime.SetFinalizer(&bt[0], func(_ *byte) { -// atomic.AddInt32(&nodeReleased, 1) -// }) -// _ = buf.Release() -// MustTrue(t, newnode.buf == nil) -// for atomic.LoadInt32(&nodeReleased) == 0 { -// runtime.GC() -// t.Log("newnode release checking") -// } -// Equal(t, atomic.LoadInt32(&nodeReleased), int32(1)) -// runtime.KeepAlive(normalBuf) -//} +func TestLinkBufferNoCopyWriteAndRead(t *testing.T) { + err := Configure(Config{Feature: Feature{AlwaysNoCopyRead: true}}) + MustNil(t, err) + defer func() { + err = Configure(Config{Feature: Feature{AlwaysNoCopyRead: false}}) + MustNil(t, err) + }() + // [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B] + const ( + mallocLen = 4096 * 2 + originLen = 4096 + dataLen = 512 + newLen = 16 + normalLen = 4096 + ) + buf := NewLinkBuffer() + bt, _ := buf.Malloc(mallocLen) + originBuf := bt[:originLen] + newBuf := bt[originLen : originLen+newLen] + + // write origin_node + for i := 0; i < originLen; i++ { + bt[i] = 'a' + } + // write data_node + userBuf := make([]byte, dataLen) + for i := 0; i < len(userBuf); i++ { + userBuf[i] = 'b' + } + buf.WriteDirect(userBuf, mallocLen-originLen) // nocopy write + // write new_node + for i := 0; i < newLen; i++ { + bt[originLen+i] = 'c' + } + buf.MallocAck(originLen + dataLen + newLen) + buf.Flush() + // write normal_node + normalBuf, _ := buf.Malloc(normalLen) + for i := 0; i < normalLen; i++ { + normalBuf[i] = 'd' + } + buf.Flush() + Equal(t, buf.Len(), originLen+dataLen+newLen+normalLen) + + // copy read origin_node + bt, _ = buf.ReadBinary(originLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'a') + } + MustTrue(t, &bt[0] != &originBuf[0]) + // next read node is data node and must be readonly and non-reusable + MustTrue(t, buf.read.next.getMode(readonlyMask) && !buf.read.next.reusable()) + // copy read data_node + bt, _ = buf.ReadBinary(dataLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'b') + } + MustTrue(t, &bt[0] != &userBuf[0]) + // copy read new_node + bt, _ = buf.ReadBinary(newLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'c') + } + MustTrue(t, &bt[0] != &newBuf[0]) + // current read node is the new node and must not be reusable + newnode := buf.read + t.Log("newnode", newnode.getMode(readonlyMask), newnode.getMode(nocopyReadMask)) + MustTrue(t, newnode.reusable()) + var nodeReleased int32 + runtime.SetFinalizer(&newnode.buf[0], func(_ *byte) { + atomic.AddInt32(&nodeReleased, 1) + }) + // nocopy read normal_node + bt, _ = buf.ReadBinary(normalLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'd') + } + MustTrue(t, &bt[0] == &normalBuf[0]) + // normal buffer never should be released + runtime.SetFinalizer(&bt[0], func(_ *byte) { + atomic.AddInt32(&nodeReleased, 1) + }) + _ = buf.Release() + MustTrue(t, newnode.buf == nil) + for atomic.LoadInt32(&nodeReleased) == 0 { + runtime.GC() + t.Log("newnode release checking") + } + Equal(t, atomic.LoadInt32(&nodeReleased), int32(1)) + runtime.KeepAlive(normalBuf) +} func TestLinkBufferBufferMode(t *testing.T) { bufnode := newLinkBufferNode(0) diff --git a/poll_loadbalance.go b/poll_loadbalance.go index 33c3ae54..6bbbc039 100644 --- a/poll_loadbalance.go +++ b/poll_loadbalance.go @@ -24,17 +24,17 @@ import ( type LoadBalance int const ( - // Random requests that connections are randomly distributed. - Random LoadBalance = iota // RoundRobin requests that connections are distributed to a Poll // in a round-robin fashion. - RoundRobin + RoundRobin LoadBalance = iota + // Random requests that connections are randomly distributed. + Random ) // loadbalance sets the load balancing method for []*polls type loadbalance interface { LoadBalance() LoadBalance - // Choose the most qualified Poll + // Pick choose the most qualified Poll Pick() (poll Poll) Rebalance(polls []Poll) @@ -42,10 +42,10 @@ type loadbalance interface { func newLoadbalance(lb LoadBalance, polls []Poll) loadbalance { switch lb { - case Random: - return newRandomLB(polls) case RoundRobin: return newRoundRobinLB(polls) + case Random: + return newRandomLB(polls) } return newRoundRobinLB(polls) } diff --git a/poll_manager.go b/poll_manager.go index 4183ac3d..602250e4 100644 --- a/poll_manager.go +++ b/poll_manager.go @@ -19,39 +19,10 @@ package netpoll import ( "fmt" - "io" - "log" - "os" "runtime" "sync/atomic" ) -func setNumLoops(numLoops int) error { - return pollmanager.SetNumLoops(numLoops) -} - -func setLoadBalance(lb LoadBalance) error { - return pollmanager.SetLoadBalance(lb) -} - -func initialize() { - // The first call of Pick() will init pollers - _ = pollmanager.Pick() -} - -func setLoggerOutput(w io.Writer) { - logger = log.New(w, "", log.LstdFlags) -} - -// pollmanager manage all pollers -var pollmanager *manager -var logger *log.Logger - -func init() { - pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1) - setLoggerOutput(os.Stderr) -} - const ( managerUninitialized = iota managerInitializing From 2bf73f2468a1db70a86c284ecf2b5157f94c2248 Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Tue, 23 Jul 2024 11:07:13 +0800 Subject: [PATCH 2/3] fix: dont reset tail read node --- connection_test.go | 7 +++++++ netpoll_options.go | 3 --- nocopy_linkbuffer.go | 3 +-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/connection_test.go b/connection_test.go index 2f1c8665..eb28a0e4 100644 --- a/connection_test.go +++ b/connection_test.go @@ -129,6 +129,13 @@ func TestConnectionRead(t *testing.T) { } func TestConnectionNoCopyReadString(t *testing.T) { + err := Configure(Config{Feature: Feature{AlwaysNoCopyRead: true}}) + MustNil(t, err) + defer func() { + err = Configure(Config{Feature: Feature{AlwaysNoCopyRead: false}}) + MustNil(t, err) + }() + r, w := GetSysFdPairs() var rconn, wconn = &connection{}, &connection{} rconn.init(&netFD{fd: r}, nil) diff --git a/netpoll_options.go b/netpoll_options.go index 0540ddf9..7b225256 100644 --- a/netpoll_options.go +++ b/netpoll_options.go @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build !windows -// +build !windows - package netpoll import ( diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index a56b3b64..7843767f 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -675,9 +675,8 @@ func (b *UnsafeLinkBuffer) calcMaxSize() (sum int) { // resetTail will reset tail node or add an empty tail node to // guarantee the tail node is not larger than 8KB func (b *UnsafeLinkBuffer) resetTail(maxSize int) { - // FIXME: Reset should be removed when find a decent way to reuse buffer if maxSize <= pagesize { - b.write.Reset() + // no need to reset a small buffer tail node return } // set nil tail From 144cd130a419d7940113cf26243fc21cdcbe8a20 Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Tue, 23 Jul 2024 11:25:50 +0800 Subject: [PATCH 3/3] fix: support windows compiler --- netpoll.go | 89 ------------ netpoll_config.go | 45 ++++++ netpoll_options.go | 143 ++----------------- netpoll_unix.go | 179 ++++++++++++++++++++++++ netpoll_test.go => netpoll_unix_test.go | 0 netpoll_windows.go | 31 +--- 6 files changed, 241 insertions(+), 246 deletions(-) delete mode 100644 netpoll.go create mode 100644 netpoll_config.go create mode 100644 netpoll_unix.go rename netpoll_test.go => netpoll_unix_test.go (100%) diff --git a/netpoll.go b/netpoll.go deleted file mode 100644 index 7f53f2a2..00000000 --- a/netpoll.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2022 CloudWeGo Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build darwin || netbsd || freebsd || openbsd || dragonfly || linux -// +build darwin netbsd freebsd openbsd dragonfly linux - -package netpoll - -import ( - "context" - "net" - "runtime" - "sync" -) - -// NewEventLoop . -func NewEventLoop(onRequest OnRequest, ops ...Option) (EventLoop, error) { - opts := &options{ - onRequest: onRequest, - } - for _, do := range ops { - do.f(opts) - } - return &eventLoop{ - opts: opts, - stop: make(chan error, 1), - }, nil -} - -type eventLoop struct { - sync.Mutex - opts *options - svr *server - stop chan error -} - -// Serve implements EventLoop. -func (evl *eventLoop) Serve(ln net.Listener) error { - npln, err := ConvertListener(ln) - if err != nil { - return err - } - evl.Lock() - evl.svr = newServer(npln, evl.opts, evl.quit) - evl.svr.Run() - evl.Unlock() - - err = evl.waitQuit() - // ensure evl will not be finalized until Serve returns - runtime.SetFinalizer(evl, nil) - return err -} - -// Shutdown signals a shutdown a begins server closing. -func (evl *eventLoop) Shutdown(ctx context.Context) error { - evl.Lock() - var svr = evl.svr - evl.svr = nil - evl.Unlock() - - if svr == nil { - return nil - } - evl.quit(nil) - return svr.Close(ctx) -} - -// waitQuit waits for a quit signal -func (evl *eventLoop) waitQuit() error { - return <-evl.stop -} - -func (evl *eventLoop) quit(err error) { - select { - case evl.stop <- err: - default: - } -} diff --git a/netpoll_config.go b/netpoll_config.go new file mode 100644 index 00000000..85c05925 --- /dev/null +++ b/netpoll_config.go @@ -0,0 +1,45 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package netpoll + +import ( + "context" + "io" +) + +// global config +var ( + defaultLinkBufferSize = pagesize + featureAlwaysNoCopyRead = false +) + +// Config expose some tuning parameters to control the internal behaviors of netpoll. +// Every parameter with the default zero value should keep the default behavior of netpoll. +type Config struct { + PollerNum int // number of pollers + BufferSize int // default size of a new connection's LinkBuffer + Runner func(ctx context.Context, f func()) // runner for event handler, most of the time use a goroutine pool. + LoggerOutput io.Writer // logger output + LoadBalance LoadBalance // load balance for poller picker + Feature // define all features that not enable by default +} + +// Feature expose some new features maybe promoted as a default behavior but not yet. +type Feature struct { + // AlwaysNoCopyRead allows some copy Read functions like ReadBinary/ReadString + // will use NoCopy read and will not reuse the underlying buffer. + // It gains more performance benefits when need read much big string/bytes in codec. + AlwaysNoCopyRead bool +} diff --git a/netpoll_options.go b/netpoll_options.go index 7b225256..b72bba49 100644 --- a/netpoll_options.go +++ b/netpoll_options.go @@ -1,4 +1,4 @@ -// Copyright 2022 CloudWeGo Authors +// Copyright 2024 CloudWeGo Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,123 +14,21 @@ package netpoll -import ( - "context" - "io" - "log" - "os" - "runtime" - "time" -) +import "time" -var ( - pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1) // pollmanager manage all pollers - logger = log.New(os.Stderr, "", log.LstdFlags) - - // global config - defaultLinkBufferSize = pagesize - featureAlwaysNoCopyRead = false -) - -// Config expose some tuning parameters to control the internal behaviors of netpoll. -// Every parameter with the default zero value should keep the default behavior of netpoll. -type Config struct { - PollerNum int // number of pollers - BufferSize int // default size of a new connection's LinkBuffer - Runner func(ctx context.Context, f func()) // runner for event handler, most of the time use a goroutine pool. - LoggerOutput io.Writer // logger output - LoadBalance LoadBalance // load balance for poller picker - Feature // define all features that not enable by default -} - -// Feature expose some new features maybe promoted as a default behavior but not yet. -type Feature struct { - // AlwaysNoCopyRead allows some copy Read functions like ReadBinary/ReadString - // will use NoCopy read and will not reuse the underlying buffer. - // It gains more performance benefits when need read much big string/bytes in codec. - AlwaysNoCopyRead bool -} - -// Configure the internal behaviors of netpoll. -// Configure must called in init() function, because the poller will read some global variable after init() finished -func Configure(config Config) (err error) { - if config.PollerNum > 0 { - if err = pollmanager.SetNumLoops(config.PollerNum); err != nil { - return err - } - } - if config.BufferSize > 0 { - defaultLinkBufferSize = config.BufferSize - } - - if config.Runner != nil { - setRunner(config.Runner) - } - if config.LoggerOutput != nil { - logger = log.New(config.LoggerOutput, "", log.LstdFlags) - } - if config.LoadBalance >= 0 { - if err = pollmanager.SetLoadBalance(config.LoadBalance); err != nil { - return err - } - } - - featureAlwaysNoCopyRead = config.AlwaysNoCopyRead - return nil -} - -// Initialize the pollers actively. By default, it's lazy initialized. -// It's safe to call it multi times. -func Initialize() { - // The first call of Pick() will init pollers - _ = pollmanager.Pick() -} - -// SetNumLoops is used to set the number of pollers, generally do not need to actively set. -// By default, the number of pollers is equal to runtime.GOMAXPROCS(0)/20+1. -// If the number of cores in your service process is less than 20c, theoretically only one poller is needed. -// Otherwise you may need to adjust the number of pollers to achieve the best results. -// Experience recommends assigning a poller every 20c. -// -// You can only use SetNumLoops before any connection is created. An example usage: -// -// func init() { -// netpoll.SetNumLoops(...) -// } -// -// Deprecated: use Configure instead. -func SetNumLoops(numLoops int) error { - return pollmanager.SetNumLoops(numLoops) -} - -// SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt -// to distribute the incoming connections between multiple polls. -// This option only works when numLoops is set. -// Deprecated: use Configure instead. -func SetLoadBalance(lb LoadBalance) error { - return pollmanager.SetLoadBalance(lb) -} - -// SetLoggerOutput sets the logger output target. -// Deprecated: use Configure instead. -func SetLoggerOutput(w io.Writer) { - logger = log.New(w, "", log.LstdFlags) -} - -// SetRunner set the runner function for every OnRequest/OnConnect callback -// Deprecated: use Configure instead. -func SetRunner(f func(ctx context.Context, f func())) { - setRunner(f) +// Option . +type Option struct { + f func(*options) } -// DisableGopool will remove gopool(the goroutine pool used to run OnRequest), -// which means that OnRequest will be run via `go OnRequest(...)`. -// Usually, OnRequest will cause stack expansion, which can be solved by reusing goroutine. -// But if you can confirm that the OnRequest will not cause stack expansion, -// it is recommended to use DisableGopool to reduce redundancy and improve performance. -// Deprecated: use Configure instead. -func DisableGopool() error { - return disableGopool() +type options struct { + onPrepare OnPrepare + onConnect OnConnect + onDisconnect OnDisconnect + onRequest OnRequest + readTimeout time.Duration + writeTimeout time.Duration + idleTimeout time.Duration } // WithOnPrepare registers the OnPrepare method to EventLoop. @@ -174,18 +72,3 @@ func WithIdleTimeout(timeout time.Duration) Option { op.idleTimeout = timeout }} } - -// Option . -type Option struct { - f func(*options) -} - -type options struct { - onPrepare OnPrepare - onConnect OnConnect - onDisconnect OnDisconnect - onRequest OnRequest - readTimeout time.Duration - writeTimeout time.Duration - idleTimeout time.Duration -} diff --git a/netpoll_unix.go b/netpoll_unix.go new file mode 100644 index 00000000..4eb25a05 --- /dev/null +++ b/netpoll_unix.go @@ -0,0 +1,179 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build darwin || netbsd || freebsd || openbsd || dragonfly || linux +// +build darwin netbsd freebsd openbsd dragonfly linux + +package netpoll + +import ( + "context" + "io" + "log" + "net" + "os" + "runtime" + "sync" +) + +var ( + pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1) // pollmanager manage all pollers + logger = log.New(os.Stderr, "", log.LstdFlags) +) + +// Initialize the pollers actively. By default, it's lazy initialized. +// It's safe to call it multi times. +func Initialize() { + // The first call of Pick() will init pollers + _ = pollmanager.Pick() +} + +// Configure the internal behaviors of netpoll. +// Configure must called in init() function, because the poller will read some global variable after init() finished +func Configure(config Config) (err error) { + if config.PollerNum > 0 { + if err = pollmanager.SetNumLoops(config.PollerNum); err != nil { + return err + } + } + if config.BufferSize > 0 { + defaultLinkBufferSize = config.BufferSize + } + + if config.Runner != nil { + setRunner(config.Runner) + } + if config.LoggerOutput != nil { + logger = log.New(config.LoggerOutput, "", log.LstdFlags) + } + if config.LoadBalance >= 0 { + if err = pollmanager.SetLoadBalance(config.LoadBalance); err != nil { + return err + } + } + + featureAlwaysNoCopyRead = config.AlwaysNoCopyRead + return nil +} + +// SetNumLoops is used to set the number of pollers, generally do not need to actively set. +// By default, the number of pollers is equal to runtime.GOMAXPROCS(0)/20+1. +// If the number of cores in your service process is less than 20c, theoretically only one poller is needed. +// Otherwise, you may need to adjust the number of pollers to achieve the best results. +// Experience recommends assigning a poller every 20c. +// +// You can only use SetNumLoops before any connection is created. An example usage: +// +// func init() { +// netpoll.SetNumLoops(...) +// } +// +// Deprecated: use Configure instead. +func SetNumLoops(numLoops int) error { + return pollmanager.SetNumLoops(numLoops) +} + +// SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt +// to distribute the incoming connections between multiple polls. +// This option only works when numLoops is set. +// Deprecated: use Configure instead. +func SetLoadBalance(lb LoadBalance) error { + return pollmanager.SetLoadBalance(lb) +} + +// SetLoggerOutput sets the logger output target. +// Deprecated: use Configure instead. +func SetLoggerOutput(w io.Writer) { + logger = log.New(w, "", log.LstdFlags) +} + +// SetRunner set the runner function for every OnRequest/OnConnect callback +// Deprecated: use Configure instead. +func SetRunner(f func(ctx context.Context, f func())) { + setRunner(f) +} + +// DisableGopool will remove gopool(the goroutine pool used to run OnRequest), +// which means that OnRequest will be run via `go OnRequest(...)`. +// Usually, OnRequest will cause stack expansion, which can be solved by reusing goroutine. +// But if you can confirm that the OnRequest will not cause stack expansion, +// it is recommended to use DisableGopool to reduce redundancy and improve performance. +// Deprecated: use Configure instead. +func DisableGopool() error { + return disableGopool() +} + +// NewEventLoop . +func NewEventLoop(onRequest OnRequest, ops ...Option) (EventLoop, error) { + opts := &options{ + onRequest: onRequest, + } + for _, do := range ops { + do.f(opts) + } + return &eventLoop{ + opts: opts, + stop: make(chan error, 1), + }, nil +} + +type eventLoop struct { + sync.Mutex + opts *options + svr *server + stop chan error +} + +// Serve implements EventLoop. +func (evl *eventLoop) Serve(ln net.Listener) error { + npln, err := ConvertListener(ln) + if err != nil { + return err + } + evl.Lock() + evl.svr = newServer(npln, evl.opts, evl.quit) + evl.svr.Run() + evl.Unlock() + + err = evl.waitQuit() + // ensure evl will not be finalized until Serve returns + runtime.SetFinalizer(evl, nil) + return err +} + +// Shutdown signals a shutdown a begins server closing. +func (evl *eventLoop) Shutdown(ctx context.Context) error { + evl.Lock() + var svr = evl.svr + evl.svr = nil + evl.Unlock() + + if svr == nil { + return nil + } + evl.quit(nil) + return svr.Close(ctx) +} + +// waitQuit waits for a quit signal +func (evl *eventLoop) waitQuit() error { + return <-evl.stop +} + +func (evl *eventLoop) quit(err error) { + select { + case evl.stop <- err: + default: + } +} diff --git a/netpoll_test.go b/netpoll_unix_test.go similarity index 100% rename from netpoll_test.go rename to netpoll_unix_test.go diff --git a/netpoll_windows.go b/netpoll_windows.go index 634d1ef9..86434e79 100644 --- a/netpoll_windows.go +++ b/netpoll_windows.go @@ -1,4 +1,4 @@ -// Copyright 2022 CloudWeGo Authors +// Copyright 2024 CloudWeGo Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,34 +20,11 @@ package netpoll import ( "net" - "time" ) -// Option . -type Option struct { - f func(*options) -} - -type options struct{} - -// WithOnPrepare registers the OnPrepare method to EventLoop. -func WithOnPrepare(onPrepare OnPrepare) Option { - return Option{} -} - -// WithOnConnect registers the OnConnect method to EventLoop. -func WithOnConnect(onConnect OnConnect) Option { - return Option{} -} - -// WithReadTimeout sets the read timeout of connections. -func WithReadTimeout(timeout time.Duration) Option { - return Option{} -} - -// WithIdleTimeout sets the idle timeout of connections. -func WithIdleTimeout(timeout time.Duration) Option { - return Option{} +// Configure the internal behaviors of netpoll. +func Configure(config Config) (err error) { + return nil } // NewDialer only support TCP and unix socket now.