Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: epollfd managered by runtime netpoller #319

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion poll_default_bsd.go
Original file line number Diff line number Diff line change
@@ -19,19 +19,28 @@ package netpoll

import (
"errors"
"runtime"
"sync"
"sync/atomic"
"syscall"
"unsafe"
)

func defaultPollNum() int {
return runtime.GOMAXPROCS(0)/20 + 1
}

func openPollFile() (int, error) {
return syscall.Kqueue()
}

func openPoll() (Poll, error) {
return openDefaultPoll()
}

func openDefaultPoll() (*defaultPoll, error) {
l := new(defaultPoll)
p, err := syscall.Kqueue()
p, err := openPollFile()
if err != nil {
return nil, err
}
37 changes: 30 additions & 7 deletions poll_default_linux.go
Original file line number Diff line number Diff line change
@@ -16,13 +16,24 @@ package netpoll

import (
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"syscall"
"unsafe"
)

func defaultPollNum() int {
// more pollers could help poller schedule user work to different P to increase parallelism,
// but also will decrease poller's epoll efficiency, so it's a trade-off
return runtime.GOMAXPROCS(0) / 2
}

func openPollFile() (int, error) {
return EpollCreate(0)
}

func openPoll() (Poll, error) {
return openDefaultPoll()
}
@@ -31,11 +42,17 @@ func openDefaultPoll() (*defaultPoll, error) {
var poll = new(defaultPoll)

poll.buf = make([]byte, 8)
var p, err = EpollCreate(0)
var p, err = openPollFile()
if err != nil {
return nil, err
}
poll.fd = p
// register epollfd into runtime's netpoller
pd, errno := runtime_pollOpen(uintptr(poll.fd))
if errno != 0 {
return nil, Exception(ErrUnsupported, fmt.Sprintf("when poll open: errno=%d", errno))
}
poll.pd = pd

var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0)
if e0 != 0 {
@@ -60,6 +77,7 @@ func openDefaultPoll() (*defaultPoll, error) {
type defaultPoll struct {
pollArgs
fd int // epoll fd
pd uintptr // the pollDesc of epoll fd in runtime's netpoller
wop *FDOperator // eventfd, wake epoll_wait
buf []byte // read wfd trigger msg
trigger uint32 // trigger flag
@@ -90,23 +108,28 @@ func (a *pollArgs) reset(size, caps int) {
// Wait implements Poll.
func (p *defaultPoll) Wait() (err error) {
// init
var caps, msec, n = barriercap, -1, 0
var caps, n = barriercap, 0
p.Reset(128, caps)
// wait
for {
if n == p.size && p.size < 128*1024 {
p.Reset(p.size<<1, caps)
}
n, err = EpollWait(p.fd, p.events, msec)
n, err = EpollWait(p.fd, p.events, 0)
if err != nil && err != syscall.EINTR {
return err
}
if n <= 0 {
msec = -1
runtime.Gosched()
if n == 0 {
errno := runtime_pollReset(p.pd, 'r')
if errno != 0 {
return Exception(ErrUnsupported, fmt.Sprintf("when poll reset: errno=%d", errno))
}
errno = runtime_pollWait(p.pd, 'r')
if errno != 0 {
return Exception(ErrUnsupported, fmt.Sprintf("when poll wait: errno=%d", errno))
}
continue
}
msec = 0
if p.Handler(p.events[:n]) {
return nil
}
200 changes: 177 additions & 23 deletions poll_default_linux_test.go
Original file line number Diff line number Diff line change
@@ -21,8 +21,7 @@ import (
"errors"
"syscall"
"testing"

"golang.org/x/sys/unix"
"time"
)

func TestEpollEvent(t *testing.T) {
@@ -54,11 +53,11 @@ func TestEpollEvent(t *testing.T) {
}

// EPOLL: add ,del and add
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event1)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event1)
MustNil(t, err)
err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event1)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, rfd, event1)
MustNil(t, err)
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event2)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event2)
MustNil(t, err)
_, err = syscall.Write(wfd, send)
MustNil(t, err)
@@ -68,15 +67,15 @@ func TestEpollEvent(t *testing.T) {
Equal(t, events[0].data, eventdata2)
_, err = syscall.Read(rfd, recv)
MustTrue(t, err == nil && string(recv) == string(send))
err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event2)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, rfd, event2)
MustNil(t, err)

// EPOLL: add ,mod and mod
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event1)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event1)
MustNil(t, err)
err = EpollCtl(epollfd, unix.EPOLL_CTL_MOD, rfd, event2)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_MOD, rfd, event2)
MustNil(t, err)
err = EpollCtl(epollfd, unix.EPOLL_CTL_MOD, rfd, event3)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_MOD, rfd, event3)
MustNil(t, err)
_, err = syscall.Write(wfd, send)
MustNil(t, err)
@@ -88,7 +87,7 @@ func TestEpollEvent(t *testing.T) {
Assert(t, events[0].events&syscall.EPOLLIN != 0)
Assert(t, events[0].events&syscall.EPOLLOUT != 0)

err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event2)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, rfd, event2)
MustNil(t, err)
}

@@ -110,7 +109,7 @@ func TestEpollWait(t *testing.T) {
events: syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR,
data: eventdata,
}
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event)
MustNil(t, err)
_, err = epollWaitUntil(epollfd, events, -1)
MustNil(t, err)
@@ -146,7 +145,7 @@ func TestEpollWait(t *testing.T) {
// EPOLL: close current fd
rfd2, wfd2 := GetSysFdPairs()
defer syscall.Close(wfd2)
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd2, event)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd2, event)
err = syscall.Close(rfd2)
MustNil(t, err)
_, err = epollWaitUntil(epollfd, events, -1)
@@ -156,7 +155,7 @@ func TestEpollWait(t *testing.T) {
Assert(t, events[0].events&syscall.EPOLLRDHUP != 0)
Assert(t, events[0].events&syscall.EPOLLERR == 0)

err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, rfd, event)
MustNil(t, err)
}

@@ -173,7 +172,7 @@ func TestEpollETClose(t *testing.T) {
}

// EPOLL: init state
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event)
_, err = epollWaitUntil(epollfd, events, -1)
MustNil(t, err)
Assert(t, events[0].events&syscall.EPOLLIN == 0)
@@ -194,7 +193,7 @@ func TestEpollETClose(t *testing.T) {
// EPOLL: close peer fd
// EPOLLIN and EPOLLOUT
rfd, wfd = GetSysFdPairs()
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event)
err = syscall.Close(wfd)
MustNil(t, err)
n, err = epollWaitUntil(epollfd, events, 100)
@@ -224,10 +223,10 @@ func TestEpollETDel(t *testing.T) {
}

// EPOLL: del partly
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, rfd, event)
MustNil(t, err)
event.events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR
err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, rfd, event)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, rfd, event)
MustNil(t, err)
_, err = syscall.Write(wfd, send)
MustNil(t, err)
@@ -268,7 +267,7 @@ func TestEpollConnectSameFD(t *testing.T) {
t.Logf("create fd: %d", fd1)
err = syscall.SetNonblock(fd1, true)
MustNil(t, err)
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd1, event1)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, fd1, event1)
MustNil(t, err)
err = syscall.Connect(fd1, &addr)
t.Log(err)
@@ -278,7 +277,7 @@ func TestEpollConnectSameFD(t *testing.T) {
//Assert(t, events[0].events&syscall.EPOLLRDHUP == 0)
//Assert(t, events[0].events&syscall.EPOLLERR == 0)
// forget to del fd
//err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, fd1, event1)
//err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, fd1, event1)
//MustNil(t, err)
err = syscall.Close(fd1) // close fd1
MustNil(t, err)
@@ -289,7 +288,7 @@ func TestEpollConnectSameFD(t *testing.T) {
t.Logf("create fd: %d", fd2)
err = syscall.SetNonblock(fd2, true)
MustNil(t, err)
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd2, event2)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, fd2, event2)
MustNil(t, err)
err = syscall.Connect(fd2, &addr)
t.Log(err)
@@ -298,7 +297,7 @@ func TestEpollConnectSameFD(t *testing.T) {
Assert(t, events[0].events&syscall.EPOLLOUT != 0)
Assert(t, events[0].events&syscall.EPOLLRDHUP == 0)
Assert(t, events[0].events&syscall.EPOLLERR == 0)
err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, fd2, event2)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_DEL, fd2, event2)
MustNil(t, err)
err = syscall.Close(fd2) // close fd2
MustNil(t, err)
@@ -310,7 +309,7 @@ func TestEpollConnectSameFD(t *testing.T) {
t.Logf("create fd: %d", fd3)
err = syscall.SetNonblock(fd3, true)
MustNil(t, err)
err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd3, event1)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_ADD, fd3, event1)
MustNil(t, err)
err = syscall.Connect(fd3, &addr)
t.Log(err)
@@ -320,7 +319,7 @@ func TestEpollConnectSameFD(t *testing.T) {
Assert(t, events[0].events&syscall.EPOLLRDHUP == 0)
Assert(t, events[0].events&syscall.EPOLLERR == 0)
MustNil(t, err)
err = EpollCtl(epollfd, unix.EPOLL_CTL_MOD, fd3, eventin)
err = EpollCtl(epollfd, syscall.EPOLL_CTL_MOD, fd3, eventin)
MustNil(t, err)
err = syscall.Close(fd3) // close fd3
MustNil(t, err)
@@ -329,6 +328,161 @@ func TestEpollConnectSameFD(t *testing.T) {
Assert(t, n == 0)
}

func TestEpollWaitEpollFD(t *testing.T) {
epollfd1, err := EpollCreate(0) // monitor epollfd2
MustNil(t, err)
epollfd2, err := EpollCreate(0) // monitor io fds
MustNil(t, err)
MustNil(t, err)
defer syscall.Close(epollfd1)
defer syscall.Close(epollfd2)

rfd, wfd := GetSysFdPairs()
defer syscall.Close(wfd)
send := []byte("hello")
recv := make([]byte, 5)
events := make([]epollevent, 128)
n := 0

// register epollfd2 into epollfd1
epevent := &epollevent{
// netpollopen: runtime/netpoll_epoll.go
events: syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | EPOLLET,
data: [8]byte{},
}
err = EpollCtl(epollfd1, syscall.EPOLL_CTL_ADD, epollfd2, epevent)
MustNil(t, err)
n, err = epollWaitUntil(epollfd1, events, 0)
Equal(t, n, 0)
MustNil(t, err)

// register rfd into epollfd2
ioevent := &epollevent{
events: syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR,
data: [8]byte{},
}
err = EpollCtl(epollfd2, syscall.EPOLL_CTL_ADD, rfd, ioevent)
MustNil(t, err)
n, err = epollWaitUntil(epollfd2, events, 0)
Equal(t, n, 0)
MustNil(t, err)

// check epollfd2 readable
n, err = syscall.Write(wfd, send)
Equal(t, n, len(send))
MustNil(t, err)
n, err = epollWaitUntil(epollfd1, events, 0)
Equal(t, n, 1)
MustNil(t, err)
Assert(t, events[0].events&syscall.EPOLLIN != 0)
n, err = epollWaitUntil(epollfd1, events, 0)
Equal(t, n, 1)
MustNil(t, err)

// check rfd readable
n, err = epollWaitUntil(epollfd2, events, 0)
Equal(t, n, 1)
MustNil(t, err)
Assert(t, events[0].events&syscall.EPOLLIN != 0)

// read rfd
n, err = syscall.Read(rfd, recv)
Equal(t, n, len(send))
MustTrue(t, err == nil && string(recv) == string(send))

// check epollfd1 non-readable
n, err = epollWaitUntil(epollfd1, events, 0)
Equal(t, n, 0)
MustNil(t, err)

// check epollfd2 non-readable
n, err = epollWaitUntil(epollfd2, events, 0)
Equal(t, n, 0)
MustNil(t, err)

// close wfd
err = syscall.Close(wfd)
MustNil(t, err)

// check epollfd1 notified when peer closed
n, err = epollWaitUntil(epollfd1, events, 0)
Equal(t, n, 1)
MustNil(t, err)
Assert(t, events[0].events&syscall.EPOLLIN != 0)
Assert(t, events[0].events&syscall.EPOLLERR == 0)

// check epollfd2 notified when peer closed
n, err = epollWaitUntil(epollfd2, events, 0)
Equal(t, n, 1)
MustNil(t, err)
Assert(t, events[0].events&syscall.EPOLLIN != 0)
Assert(t, events[0].events&syscall.EPOLLRDHUP != 0)
Assert(t, events[0].events&syscall.EPOLLERR == 0)

// close rfd
err = syscall.Close(rfd)
MustNil(t, err)

// check epollfd1 non-readable
n, err = epollWaitUntil(epollfd1, events, 0)
Equal(t, n, 0)
MustNil(t, err)

// check epollfd2 non-readable
n, err = epollWaitUntil(epollfd2, events, 0)
Equal(t, n, 0)
MustNil(t, err)
}

func TestRuntimeNetpoller(t *testing.T) {
pfd, err := openPollFile()
MustNil(t, err)

pd, errno := runtime_pollOpen(uintptr(pfd))
Assert(t, errno == 0, errno)
t.Logf("poll open success: pd=%d", pd)

var rfd, wfd = GetSysFdPairs()

eventin := &epollevent{
events: syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR,
data: [8]byte{0, 0, 0, 0, 0, 0, 0, 1},
}
err = EpollCtl(pfd, syscall.EPOLL_CTL_ADD, rfd, eventin)
MustNil(t, err)

go func() {
time.Sleep(time.Millisecond * 100)

iovec := [1]syscall.Iovec{}
buf := []byte("hello")
n, err := writev(wfd, [][]byte{buf}, iovec[:])
MustNil(t, err)
Equal(t, n, 5)
t.Logf("poll read success: %s", string(buf[:n]))
}()

begin := time.Now()
errno = runtime_pollWait(pd, 'r'+'w')
Assert(t, errno == 0, errno)
cost := time.Since(begin)
Assert(t, cost.Milliseconds() >= 100)

events := make([]epollevent, 1)
n, err := EpollWait(pfd, events, 0)
MustNil(t, err)
Equal(t, n, 1)
t.Logf("poll wait success")

iovec := [1]syscall.Iovec{}
buf := make([]byte, 1024)
bs := [1][]byte{buf}
n, err = readv(rfd, bs[:], iovec[:])
MustNil(t, err)
Equal(t, n, 5)
t.Logf("poll read success: %s", string(buf[:n]))
}

func epollWaitUntil(epfd int, events []epollevent, msec int) (n int, err error) {
WAIT:
n, err = EpollWait(epfd, events, msec)
2 changes: 1 addition & 1 deletion poll_manager.go
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ var pollmanager *manager
var logger *log.Logger

func init() {
pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1)
pollmanager = newManager(defaultPollNum())
setLoggerOutput(os.Stderr)
}

28 changes: 28 additions & 0 deletions runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package netpoll

import (
_ "unsafe"
)

//go:linkname runtime_pollOpen internal/poll.runtime_pollOpen
func runtime_pollOpen(fd uintptr) (pd uintptr, errno int)

//go:linkname runtime_pollWait internal/poll.runtime_pollWait
func runtime_pollWait(pd uintptr, mode int) (errno int)

//go:linkname runtime_pollReset internal/poll.runtime_pollReset
func runtime_pollReset(pd uintptr, mode int) (errno int)