From 6039df799d81c67a6ad1702a2b0569eae751e13e Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Wed, 13 Mar 2024 19:08:37 +0800 Subject: [PATCH] feat: epoll managered by runtime netpoller --- poll_default_bsd.go | 11 ++++- poll_default_linux.go | 37 ++++++++++++--- poll_default_linux_test.go | 94 ++++++++++++++++++++++++++++---------- poll_manager.go | 2 +- runtime.go | 28 ++++++++++++ 5 files changed, 140 insertions(+), 32 deletions(-) create mode 100644 runtime.go diff --git a/poll_default_bsd.go b/poll_default_bsd.go index 9c8aa8c9..5b8657da 100644 --- a/poll_default_bsd.go +++ b/poll_default_bsd.go @@ -19,19 +19,28 @@ package netpoll import ( "errors" + "runtime" "sync" "sync/atomic" "syscall" "unsafe" ) +func defaultPollNum() int { + return runtime.GOMAXPROCS(0)/20 + 1 +} + +func openPollFile() (int, error) { + return syscall.Kqueue() +} + func openPoll() (Poll, error) { return openDefaultPoll() } func openDefaultPoll() (*defaultPoll, error) { l := new(defaultPoll) - p, err := syscall.Kqueue() + p, err := openPollFile() if err != nil { return nil, err } diff --git a/poll_default_linux.go b/poll_default_linux.go index a0087ee0..50591474 100644 --- a/poll_default_linux.go +++ b/poll_default_linux.go @@ -16,6 +16,7 @@ package netpoll import ( "errors" + "fmt" "runtime" "sync" "sync/atomic" @@ -23,6 +24,16 @@ import ( "unsafe" ) +func defaultPollNum() int { + // more pollers could help poller schedule user work to different P to increase parallelism, + // but also will decrease poller's epoll efficiency, so it's a trade-off + return runtime.GOMAXPROCS(0) / 2 +} + +func openPollFile() (int, error) { + return EpollCreate(0) +} + func openPoll() (Poll, error) { return openDefaultPoll() } @@ -31,11 +42,17 @@ func openDefaultPoll() (*defaultPoll, error) { var poll = new(defaultPoll) poll.buf = make([]byte, 8) - var p, err = EpollCreate(0) + var p, err = openPollFile() if err != nil { return nil, err } poll.fd = p + // register epollfd into runtime's netpoller + pd, errno := runtime_pollOpen(uintptr(poll.fd)) + if errno != 0 { + return nil, Exception(ErrUnsupported, fmt.Sprintf("when poll open: errno=%d", errno)) + } + poll.pd = pd var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0) if e0 != 0 { @@ -60,6 +77,7 @@ func openDefaultPoll() (*defaultPoll, error) { type defaultPoll struct { pollArgs fd int // epoll fd + pd uintptr // the pollDesc of epoll fd in runtime's netpoller wop *FDOperator // eventfd, wake epoll_wait buf []byte // read wfd trigger msg trigger uint32 // trigger flag @@ -90,23 +108,28 @@ func (a *pollArgs) reset(size, caps int) { // Wait implements Poll. func (p *defaultPoll) Wait() (err error) { // init - var caps, msec, n = barriercap, -1, 0 + var caps, n = barriercap, 0 p.Reset(128, caps) // wait for { if n == p.size && p.size < 128*1024 { p.Reset(p.size<<1, caps) } - n, err = EpollWait(p.fd, p.events, msec) + n, err = EpollWait(p.fd, p.events, 0) if err != nil && err != syscall.EINTR { return err } - if n <= 0 { - msec = -1 - runtime.Gosched() + if n == 0 { + errno := runtime_pollReset(p.pd, 'r') + if errno != 0 { + return Exception(ErrUnsupported, fmt.Sprintf("when poll reset: errno=%d", errno)) + } + errno = runtime_pollWait(p.pd, 'r') + if errno != 0 { + return Exception(ErrUnsupported, fmt.Sprintf("when poll wait: errno=%d", errno)) + } continue } - msec = 0 if p.Handler(p.events[:n]) { return nil } diff --git a/poll_default_linux_test.go b/poll_default_linux_test.go index 072963d7..8159bd83 100644 --- a/poll_default_linux_test.go +++ b/poll_default_linux_test.go @@ -21,8 +21,7 @@ import ( "errors" "syscall" "testing" - - "golang.org/x/sys/unix" + "time" ) func TestEpollEvent(t *testing.T) { @@ -54,11 +53,11 @@ func TestEpollEvent(t *testing.T) { } // EPOLL: add ,del and add - err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event1) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event1) MustNil(t, err) - err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event1) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, rfd, event1) MustNil(t, err) - err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event2) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event2) MustNil(t, err) _, err = syscall.Write(wfd, send) MustNil(t, err) @@ -68,15 +67,15 @@ func TestEpollEvent(t *testing.T) { Equal(t, events[0].data, eventdata2) _, err = syscall.Read(rfd, recv) MustTrue(t, err == nil && string(recv) == string(send)) - err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event2) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, rfd, event2) MustNil(t, err) // EPOLL: add ,mod and mod - err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event1) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event1) MustNil(t, err) - err = EpollCtl(epollfd, unix.EPOLL_CTL_MOD, rfd, event2) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_MOD, rfd, event2) MustNil(t, err) - err = EpollCtl(epollfd, unix.EPOLL_CTL_MOD, rfd, event3) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_MOD, rfd, event3) MustNil(t, err) _, err = syscall.Write(wfd, send) MustNil(t, err) @@ -88,7 +87,7 @@ func TestEpollEvent(t *testing.T) { Assert(t, events[0].events&syscall.EPOLLIN != 0) Assert(t, events[0].events&syscall.EPOLLOUT != 0) - err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event2) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, rfd, event2) MustNil(t, err) } @@ -110,7 +109,7 @@ func TestEpollWait(t *testing.T) { events: syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR, data: eventdata, } - err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event) MustNil(t, err) _, err = epollWaitUntil(epollfd, events, -1) MustNil(t, err) @@ -146,7 +145,7 @@ func TestEpollWait(t *testing.T) { // EPOLL: close current fd rfd2, wfd2 := GetSysFdPairs() defer syscall.Close(wfd2) - err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd2, event) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd2, event) err = syscall.Close(rfd2) MustNil(t, err) _, err = epollWaitUntil(epollfd, events, -1) @@ -156,7 +155,7 @@ func TestEpollWait(t *testing.T) { Assert(t, events[0].events&syscall.EPOLLRDHUP != 0) Assert(t, events[0].events&syscall.EPOLLERR == 0) - err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, rfd, event) MustNil(t, err) } @@ -173,7 +172,7 @@ func TestEpollETClose(t *testing.T) { } // EPOLL: init state - err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event) _, err = epollWaitUntil(epollfd, events, -1) MustNil(t, err) Assert(t, events[0].events&syscall.EPOLLIN == 0) @@ -194,7 +193,7 @@ func TestEpollETClose(t *testing.T) { // EPOLL: close peer fd // EPOLLIN and EPOLLOUT rfd, wfd = GetSysFdPairs() - err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event) err = syscall.Close(wfd) MustNil(t, err) n, err = epollWaitUntil(epollfd, events, 100) @@ -224,10 +223,10 @@ func TestEpollETDel(t *testing.T) { } // EPOLL: del partly - err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event) MustNil(t, err) event.events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR - err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, rfd, event) MustNil(t, err) _, err = syscall.Write(wfd, send) MustNil(t, err) @@ -268,7 +267,7 @@ func TestEpollConnectSameFD(t *testing.T) { t.Logf("create fd: %d", fd1) err = syscall.SetNonblock(fd1, true) MustNil(t, err) - err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd1, event1) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, fd1, event1) MustNil(t, err) err = syscall.Connect(fd1, &addr) t.Log(err) @@ -278,7 +277,7 @@ func TestEpollConnectSameFD(t *testing.T) { //Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) //Assert(t, events[0].events&syscall.EPOLLERR == 0) // forget to del fd - //err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, fd1, event1) + //err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, fd1, event1) //MustNil(t, err) err = syscall.Close(fd1) // close fd1 MustNil(t, err) @@ -289,7 +288,7 @@ func TestEpollConnectSameFD(t *testing.T) { t.Logf("create fd: %d", fd2) err = syscall.SetNonblock(fd2, true) MustNil(t, err) - err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd2, event2) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, fd2, event2) MustNil(t, err) err = syscall.Connect(fd2, &addr) t.Log(err) @@ -298,7 +297,7 @@ func TestEpollConnectSameFD(t *testing.T) { Assert(t, events[0].events&syscall.EPOLLOUT != 0) Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) Assert(t, events[0].events&syscall.EPOLLERR == 0) - err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, fd2, event2) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, fd2, event2) MustNil(t, err) err = syscall.Close(fd2) // close fd2 MustNil(t, err) @@ -310,7 +309,7 @@ func TestEpollConnectSameFD(t *testing.T) { t.Logf("create fd: %d", fd3) err = syscall.SetNonblock(fd3, true) MustNil(t, err) - err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd3, event1) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, fd3, event1) MustNil(t, err) err = syscall.Connect(fd3, &addr) t.Log(err) @@ -320,7 +319,7 @@ func TestEpollConnectSameFD(t *testing.T) { Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) Assert(t, events[0].events&syscall.EPOLLERR == 0) MustNil(t, err) - err = EpollCtl(epollfd, unix.EPOLL_CTL_MOD, fd3, eventin) + err = EpollCtl(epollfd, syscall.EPOLL_CTL_MOD, fd3, eventin) MustNil(t, err) err = syscall.Close(fd3) // close fd3 MustNil(t, err) @@ -329,6 +328,55 @@ func TestEpollConnectSameFD(t *testing.T) { Assert(t, n == 0) } +func TestRuntimeNetpoller(t *testing.T) { + pfd, err := openPollFile() + MustNil(t, err) + + pd, errno := runtime_pollOpen(uintptr(pfd)) + Assert(t, errno == 0, errno) + t.Logf("poll open success: pd=%d", pd) + + var rfd, wfd = GetSysFdPairs() + + eventin := &epollevent{ + events: syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR, + data: [8]byte{0, 0, 0, 0, 0, 0, 0, 1}, + } + err = EpollCtl(pfd, syscall.EPOLL_CTL_ADD, rfd, eventin) + MustNil(t, err) + + go func() { + time.Sleep(time.Millisecond * 100) + + iovec := [1]syscall.Iovec{} + buf := []byte("hello") + n, err := writev(wfd, [][]byte{buf}, iovec[:]) + MustNil(t, err) + Equal(t, n, 5) + t.Logf("poll read success: %s", string(buf[:n])) + }() + + begin := time.Now() + errno = runtime_pollWait(pd, 'r'+'w') + Assert(t, errno == 0, errno) + cost := time.Since(begin) + Assert(t, cost.Milliseconds() >= 100) + + events := make([]epollevent, 1) + n, err := EpollWait(pfd, events, 0) + MustNil(t, err) + Equal(t, n, 1) + t.Logf("poll wait success") + + iovec := [1]syscall.Iovec{} + buf := make([]byte, 1024) + bs := [1][]byte{buf} + n, err = readv(rfd, bs[:], iovec[:]) + MustNil(t, err) + Equal(t, n, 5) + t.Logf("poll read success: %s", string(buf[:n])) +} + func epollWaitUntil(epfd int, events []epollevent, msec int) (n int, err error) { WAIT: n, err = EpollWait(epfd, events, msec) diff --git a/poll_manager.go b/poll_manager.go index 4183ac3d..965f6c96 100644 --- a/poll_manager.go +++ b/poll_manager.go @@ -48,7 +48,7 @@ var pollmanager *manager var logger *log.Logger func init() { - pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1) + pollmanager = newManager(defaultPollNum()) setLoggerOutput(os.Stderr) } diff --git a/runtime.go b/runtime.go new file mode 100644 index 00000000..5757ff0d --- /dev/null +++ b/runtime.go @@ -0,0 +1,28 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package netpoll + +import ( + _ "unsafe" +) + +//go:linkname runtime_pollOpen internal/poll.runtime_pollOpen +func runtime_pollOpen(fd uintptr) (pd uintptr, errno int) + +//go:linkname runtime_pollWait internal/poll.runtime_pollWait +func runtime_pollWait(pd uintptr, mode int) (errno int) + +//go:linkname runtime_pollReset internal/poll.runtime_pollReset +func runtime_pollReset(pd uintptr, mode int) (errno int)