Skip to content

Commit

Permalink
feat: epoll managered by runtime netpoller
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Mar 14, 2024
1 parent bb9c3f7 commit 6039df7
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 32 deletions.
11 changes: 10 additions & 1 deletion poll_default_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,28 @@ package netpoll

import (
"errors"
"runtime"
"sync"
"sync/atomic"
"syscall"
"unsafe"
)

func defaultPollNum() int {
return runtime.GOMAXPROCS(0)/20 + 1
}

func openPollFile() (int, error) {
return syscall.Kqueue()
}

func openPoll() (Poll, error) {
return openDefaultPoll()
}

func openDefaultPoll() (*defaultPoll, error) {
l := new(defaultPoll)
p, err := syscall.Kqueue()
p, err := openPollFile()
if err != nil {
return nil, err
}
Expand Down
37 changes: 30 additions & 7 deletions poll_default_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,24 @@ package netpoll

import (
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"syscall"
"unsafe"
)

func defaultPollNum() int {
// more pollers could help poller schedule user work to different P to increase parallelism,
// but also will decrease poller's epoll efficiency, so it's a trade-off
return runtime.GOMAXPROCS(0) / 2
}

func openPollFile() (int, error) {
return EpollCreate(0)
}

func openPoll() (Poll, error) {
return openDefaultPoll()
}
Expand All @@ -31,11 +42,17 @@ func openDefaultPoll() (*defaultPoll, error) {
var poll = new(defaultPoll)

poll.buf = make([]byte, 8)
var p, err = EpollCreate(0)
var p, err = openPollFile()
if err != nil {
return nil, err
}
poll.fd = p
// register epollfd into runtime's netpoller
pd, errno := runtime_pollOpen(uintptr(poll.fd))
if errno != 0 {
return nil, Exception(ErrUnsupported, fmt.Sprintf("when poll open: errno=%d", errno))
}
poll.pd = pd

var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0)
if e0 != 0 {
Expand All @@ -60,6 +77,7 @@ func openDefaultPoll() (*defaultPoll, error) {
type defaultPoll struct {
pollArgs
fd int // epoll fd
pd uintptr // the pollDesc of epoll fd in runtime's netpoller
wop *FDOperator // eventfd, wake epoll_wait
buf []byte // read wfd trigger msg
trigger uint32 // trigger flag
Expand Down Expand Up @@ -90,23 +108,28 @@ func (a *pollArgs) reset(size, caps int) {
// Wait implements Poll.
func (p *defaultPoll) Wait() (err error) {
// init
var caps, msec, n = barriercap, -1, 0
var caps, n = barriercap, 0
p.Reset(128, caps)
// wait
for {
if n == p.size && p.size < 128*1024 {
p.Reset(p.size<<1, caps)
}
n, err = EpollWait(p.fd, p.events, msec)
n, err = EpollWait(p.fd, p.events, 0)
if err != nil && err != syscall.EINTR {
return err
}
if n <= 0 {
msec = -1
runtime.Gosched()
if n == 0 {
errno := runtime_pollReset(p.pd, 'r')
if errno != 0 {
return Exception(ErrUnsupported, fmt.Sprintf("when poll reset: errno=%d", errno))
}
errno = runtime_pollWait(p.pd, 'r')
if errno != 0 {
return Exception(ErrUnsupported, fmt.Sprintf("when poll wait: errno=%d", errno))
}
continue
}
msec = 0
if p.Handler(p.events[:n]) {
return nil
}
Expand Down
94 changes: 71 additions & 23 deletions poll_default_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import (
"errors"
"syscall"
"testing"

"golang.org/x/sys/unix"
"time"
)

func TestEpollEvent(t *testing.T) {
Expand Down Expand Up @@ -54,11 +53,11 @@ func TestEpollEvent(t *testing.T) {
}

// EPOLL: add ,del and add
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event1)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event1)
MustNil(t, err)
err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event1)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, rfd, event1)
MustNil(t, err)
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event2)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event2)
MustNil(t, err)
_, err = syscall.Write(wfd, send)
MustNil(t, err)
Expand All @@ -68,15 +67,15 @@ func TestEpollEvent(t *testing.T) {
Equal(t, events[0].data, eventdata2)
_, err = syscall.Read(rfd, recv)
MustTrue(t, err == nil && string(recv) == string(send))
err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event2)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, rfd, event2)
MustNil(t, err)

// EPOLL: add ,mod and mod
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event1)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event1)
MustNil(t, err)
err = EpollCtl(epollfd, unix.EPOLL_CTL_MOD, rfd, event2)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_MOD, rfd, event2)
MustNil(t, err)
err = EpollCtl(epollfd, unix.EPOLL_CTL_MOD, rfd, event3)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_MOD, rfd, event3)
MustNil(t, err)
_, err = syscall.Write(wfd, send)
MustNil(t, err)
Expand All @@ -88,7 +87,7 @@ func TestEpollEvent(t *testing.T) {
Assert(t, events[0].events&syscall.EPOLLIN != 0)
Assert(t, events[0].events&syscall.EPOLLOUT != 0)

err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event2)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, rfd, event2)
MustNil(t, err)
}

Expand All @@ -110,7 +109,7 @@ func TestEpollWait(t *testing.T) {
events: syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR,
data: eventdata,
}
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event)
MustNil(t, err)
_, err = epollWaitUntil(epollfd, events, -1)
MustNil(t, err)
Expand Down Expand Up @@ -146,7 +145,7 @@ func TestEpollWait(t *testing.T) {
// EPOLL: close current fd
rfd2, wfd2 := GetSysFdPairs()
defer syscall.Close(wfd2)
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd2, event)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd2, event)
err = syscall.Close(rfd2)
MustNil(t, err)
_, err = epollWaitUntil(epollfd, events, -1)
Expand All @@ -156,7 +155,7 @@ func TestEpollWait(t *testing.T) {
Assert(t, events[0].events&syscall.EPOLLRDHUP != 0)
Assert(t, events[0].events&syscall.EPOLLERR == 0)

err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, rfd, event)
MustNil(t, err)
}

Expand All @@ -173,7 +172,7 @@ func TestEpollETClose(t *testing.T) {
}

// EPOLL: init state
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event)
_, err = epollWaitUntil(epollfd, events, -1)
MustNil(t, err)
Assert(t, events[0].events&syscall.EPOLLIN == 0)
Expand All @@ -194,7 +193,7 @@ func TestEpollETClose(t *testing.T) {
// EPOLL: close peer fd
// EPOLLIN and EPOLLOUT
rfd, wfd = GetSysFdPairs()
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event)
err = syscall.Close(wfd)
MustNil(t, err)
n, err = epollWaitUntil(epollfd, events, 100)
Expand Down Expand Up @@ -224,10 +223,10 @@ func TestEpollETDel(t *testing.T) {
}

// EPOLL: del partly
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event)
MustNil(t, err)
event.events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR
err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, rfd, event)
MustNil(t, err)
_, err = syscall.Write(wfd, send)
MustNil(t, err)
Expand Down Expand Up @@ -268,7 +267,7 @@ func TestEpollConnectSameFD(t *testing.T) {
t.Logf("create fd: %d", fd1)
err = syscall.SetNonblock(fd1, true)
MustNil(t, err)
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd1, event1)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, fd1, event1)
MustNil(t, err)
err = syscall.Connect(fd1, &addr)
t.Log(err)
Expand All @@ -278,7 +277,7 @@ func TestEpollConnectSameFD(t *testing.T) {
//Assert(t, events[0].events&syscall.EPOLLRDHUP == 0)
//Assert(t, events[0].events&syscall.EPOLLERR == 0)
// forget to del fd
//err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, fd1, event1)
//err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, fd1, event1)
//MustNil(t, err)
err = syscall.Close(fd1) // close fd1
MustNil(t, err)
Expand All @@ -289,7 +288,7 @@ func TestEpollConnectSameFD(t *testing.T) {
t.Logf("create fd: %d", fd2)
err = syscall.SetNonblock(fd2, true)
MustNil(t, err)
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd2, event2)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, fd2, event2)
MustNil(t, err)
err = syscall.Connect(fd2, &addr)
t.Log(err)
Expand All @@ -298,7 +297,7 @@ func TestEpollConnectSameFD(t *testing.T) {
Assert(t, events[0].events&syscall.EPOLLOUT != 0)
Assert(t, events[0].events&syscall.EPOLLRDHUP == 0)
Assert(t, events[0].events&syscall.EPOLLERR == 0)
err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, fd2, event2)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, fd2, event2)
MustNil(t, err)
err = syscall.Close(fd2) // close fd2
MustNil(t, err)
Expand All @@ -310,7 +309,7 @@ func TestEpollConnectSameFD(t *testing.T) {
t.Logf("create fd: %d", fd3)
err = syscall.SetNonblock(fd3, true)
MustNil(t, err)
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd3, event1)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, fd3, event1)
MustNil(t, err)
err = syscall.Connect(fd3, &addr)
t.Log(err)
Expand All @@ -320,7 +319,7 @@ func TestEpollConnectSameFD(t *testing.T) {
Assert(t, events[0].events&syscall.EPOLLRDHUP == 0)
Assert(t, events[0].events&syscall.EPOLLERR == 0)
MustNil(t, err)
err = EpollCtl(epollfd, unix.EPOLL_CTL_MOD, fd3, eventin)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_MOD, fd3, eventin)
MustNil(t, err)
err = syscall.Close(fd3) // close fd3
MustNil(t, err)
Expand All @@ -329,6 +328,55 @@ func TestEpollConnectSameFD(t *testing.T) {
Assert(t, n == 0)
}

func TestRuntimeNetpoller(t *testing.T) {
pfd, err := openPollFile()
MustNil(t, err)

pd, errno := runtime_pollOpen(uintptr(pfd))
Assert(t, errno == 0, errno)
t.Logf("poll open success: pd=%d", pd)

var rfd, wfd = GetSysFdPairs()

eventin := &epollevent{
events: syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR,
data: [8]byte{0, 0, 0, 0, 0, 0, 0, 1},
}
err = EpollCtl(pfd, syscall.EPOLL_CTL_ADD, rfd, eventin)
MustNil(t, err)

go func() {
time.Sleep(time.Millisecond * 100)

iovec := [1]syscall.Iovec{}
buf := []byte("hello")
n, err := writev(wfd, [][]byte{buf}, iovec[:])
MustNil(t, err)
Equal(t, n, 5)
t.Logf("poll read success: %s", string(buf[:n]))
}()

begin := time.Now()
errno = runtime_pollWait(pd, 'r'+'w')
Assert(t, errno == 0, errno)
cost := time.Since(begin)
Assert(t, cost.Milliseconds() >= 100)

events := make([]epollevent, 1)
n, err := EpollWait(pfd, events, 0)
MustNil(t, err)
Equal(t, n, 1)
t.Logf("poll wait success")

iovec := [1]syscall.Iovec{}
buf := make([]byte, 1024)
bs := [1][]byte{buf}
n, err = readv(rfd, bs[:], iovec[:])
MustNil(t, err)
Equal(t, n, 5)
t.Logf("poll read success: %s", string(buf[:n]))
}

func epollWaitUntil(epfd int, events []epollevent, msec int) (n int, err error) {
WAIT:
n, err = EpollWait(epfd, events, msec)
Expand Down
2 changes: 1 addition & 1 deletion poll_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var pollmanager *manager
var logger *log.Logger

func init() {
pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1)
pollmanager = newManager(defaultPollNum())
setLoggerOutput(os.Stderr)
}

Expand Down
28 changes: 28 additions & 0 deletions runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package netpoll

import (
_ "unsafe"
)

//go:linkname runtime_pollOpen internal/poll.runtime_pollOpen
func runtime_pollOpen(fd uintptr) (pd uintptr, errno int)

//go:linkname runtime_pollWait internal/poll.runtime_pollWait
func runtime_pollWait(pd uintptr, mode int) (errno int)

//go:linkname runtime_pollReset internal/poll.runtime_pollReset
func runtime_pollReset(pd uintptr, mode int) (errno int)

0 comments on commit 6039df7

Please sign in to comment.