Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: feat: io_uring for netpoll I/O poller #197

Open
wants to merge 67 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
d766619
feat: add variable parameters to manager.Run() for PollType of manage…
Aug 24, 2022
0227947
feat: re-design openPoll() to select one PollType
Aug 24, 2022
bb5ef53
feat: add IOURingPoll (WIP)
Aug 25, 2022
16074e4
feat: add sysMmap & sysMunmap
Aug 25, 2022
00ee563
fix: rename io_uring to uring
Aug 27, 2022
04cd494
fix: uniform variable r to u
Aug 29, 2022
a940bfa
feat: add const for mmap
Aug 29, 2022
a375d8c
feat: add setup, enter & register for system call
Aug 29, 2022
3e90d2b
feat: add setup options
Aug 29, 2022
8db732f
feat: add SQEntry & CQEvent
Aug 29, 2022
ea33f82
feat: add atomic operation for barrier
Aug 29, 2022
fda4b12
feat: add probe supported capability
Aug 29, 2022
7f9768b
feat: add advance usage for register
Aug 29, 2022
fdc0e5b
feat: add uring for low-level interface
Aug 29, 2022
4c5b712
feat: add submission operations
Aug 29, 2022
641d782
feat: add completion operations
Aug 29, 2022
8e81931
fix: restructure URingSQE & URingCQE
Sep 7, 2022
1057583
fix: wrap mmap & unmmap, recovery syscall.MAP_POPULATE
Sep 7, 2022
a02b287
faet: public getOp by Op
Sep 7, 2022
58d195d
fix: update sysRegister to SysRegister
Sep 7, 2022
4fab811
feat: public syscall
Sep 7, 2022
e3f4b83
feat: restructure uring methord
Sep 7, 2022
392c003
fix: remove sys_barrier.go
Sep 7, 2022
5ad8bd6
fix: Copyright 2022 CloudWeGo Authors
Sep 7, 2022
61f407b
fix: rollback poll_default_* & poll_manager
Sep 8, 2022
c82d419
feat: restructure URingCQE, Error & rename setData to setUserData
Sep 14, 2022
6e445f8
fix: cal size
Sep 14, 2022
2e064d6
feat: add sys_operation
Sep 14, 2022
18aac13
feat: const _size* & fix Sys*
Sep 14, 2022
546b5cd
fix: correct methods
Sep 14, 2022
4386f8c
fix: restructure submit*
Sep 14, 2022
28cc8eb
feat: add Queue & fix others
Sep 14, 2022
a4ffc46
feat: add test-coverage at 65.6% with bad TestTimeoutWait
Sep 14, 2022
3816272
fix: correct import
Sep 14, 2022
dbdf554
fix: add timeout check for WaitCQEs
Sep 21, 2022
f95b7f6
fix: simplify Syscall6 for SysEnter
Sep 21, 2022
b6334f4
feat: add acceptOp
Sep 21, 2022
72137f0
fix: rename OpCode to OpFlag
Sep 21, 2022
32a14c0
feat: add cat example
Sep 21, 2022
53df5fc
feat: add server example
Sep 21, 2022
f86c99a
Merge branch 'cloudwego:develop' into feat/io_uring
Oct 10, 2022
cdbc94f
fix: rename cq.kRingMsk to cq.kRingMask, add annotation
Oct 12, 2022
c1ec061
feat: update peekBatchCQE & peekCQE, and add getEvents
Oct 12, 2022
6a4d7e2
fix: rename cq.kRingMsk to cq.kRingMask
Oct 12, 2022
e9ff6a3
feat: add benckmark for uring & epoll
Oct 12, 2022
7c65c95
Merge branch 'feat/io_uring' of github.com:Jacob953/netpoll into feat…
Oct 12, 2022
46ca4cf
feat: add memory_barrier
Oct 12, 2022
83cc3e2
fix: TestTimeoutWait not supported
Oct 13, 2022
93dda0b
fix: update SMP_MEMORY_BARRIER
Oct 13, 2022
625c0c4
feat: implement netpoll poller register
Oct 13, 2022
0013936
fix: rm poll_io_uring.go for restructuring
Oct 13, 2022
302e5be
fix: rm SMP_SQRING.Store() at SysSetup
Oct 13, 2022
03248a3
fix: openPoll segmentation violation
Oct 13, 2022
5e0b446
fix: simplify
Oct 14, 2022
4d8abb9
feat: modify OpFlag
Oct 14, 2022
ac1a9d8
feat: add RegisterURingPoll
Oct 14, 2022
e1a8711
feat: update pollRegister
Oct 14, 2022
9dbdc9d
feat: update go version to 1.17 for unsafe.Add
Oct 14, 2022
d2b3966
feat: add register for event, rm opflag
Oct 15, 2022
5a6033f
feat: add PollAdd & PollRemove
Oct 15, 2022
4608460
feat: add uring poller
Oct 15, 2022
0737179
fix: rm fmt(for test)
Oct 17, 2022
ef05b54
feat: add URingEpollCtl
Oct 17, 2022
326225e
fix: restructure uringpoll
Oct 17, 2022
861cfa8
fix: check trig and exit first
Oct 17, 2022
dbd117a
fix: add PollAdd for listen
Oct 17, 2022
cfc5b9a
feat: restructure Control
Oct 17, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
module github.com/cloudwego/netpoll

go 1.15
go 1.17

require github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7
require (
github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7
golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20220110181412-a018aaa089fe/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64 h1:UiNENfZ8gDvpiWw7IpOMQ27spWmThO1RwwdQVbJahJM=
golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
Expand Down
6 changes: 1 addition & 5 deletions poll_default_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ import (
"unsafe"
)

func openPoll() Poll {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

poll_xxx 这些先删掉吧,这里需要开发注册机制,不通过 type 区分

return openDefaultPoll()
}

func openDefaultPoll() *defaultPoll {
func openDefaultPoll() Poll {
l := new(defaultPoll)
p, err := syscall.Kqueue()
if err != nil {
Expand Down
7 changes: 1 addition & 6 deletions poll_default_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@ import (
"unsafe"
)

// Includes defaultPoll/multiPoll/uringPoll...
func openPoll() Poll {
return openDefaultPoll()
}

func openDefaultPoll() *defaultPoll {
func openDefaultPoll() Poll {
var poll = defaultPoll{}
poll.buf = make([]byte, 8)
var p, err = syscall.EpollCreate1(0)
Expand Down
5 changes: 5 additions & 0 deletions poll_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"runtime"
)

// Includes defaultPoll/multiPoll/uringPoll...
func openPoll() Poll {
return pollRegister()
}

func setNumLoops(numLoops int) error {
return pollmanager.SetNumLoops(numLoops)
}
Expand Down
7 changes: 1 addition & 6 deletions poll_race_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@ import (
"syscall"
)

// mock no race poll
func openPoll() Poll {
return openDefaultPoll()
}

func openDefaultPoll() *defaultPoll {
func openDefaultPoll() Poll {
l := new(defaultPoll)
p, err := syscall.Kqueue()
if err != nil {
Expand Down
7 changes: 1 addition & 6 deletions poll_race_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@ import (
"syscall"
)

// mock no race poll
func openPoll() Poll {
return openDefaultPoll()
}

func openDefaultPoll() *defaultPoll {
func openDefaultPoll() Poll {
var poll = defaultPoll{}
poll.buf = make([]byte, 8)
var p, err = syscall.EpollCreate1(0)
Expand Down
31 changes: 31 additions & 0 deletions poll_register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package netpoll

// registerPoll is the func of openning Poller
var pollRegister = openDefaultPoll

// RegisterEpoll implement Epoll
func RegisterEpoll() {
pollRegister = openDefaultPoll
}

// RegisterURingPoll implement URing Poller
func RegisterURingPoll() {
pollRegister = openURingPoll
}
250 changes: 250 additions & 0 deletions poll_uring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !race
// +build !race

package netpoll

import (
"log"
"sync/atomic"
"syscall"
"unsafe"

. "github.com/cloudwego/netpoll/uring"
)

func openURingPoll() Poll {
poll := &uringPoll{}
uring, err := IOURing(128)
if err != nil {
panic(err)
}
poll.uring = uring
return poll
}

type uringPoll struct {
size int
caps int
trigger uint32

uring *URing
cqes []*URingCQE
barriers []barrier
hups []func(p Poll) error
}

// Wait implements Poll.
func (p *uringPoll) Wait() error {
// init
var caps, n = barriercap, 0
p.reset(128, caps)
// wait
for {
if n == p.size && p.size < 128*1024 {
p.reset(p.size<<1, caps)
}
n = p.uring.PeekBatchCQE(p.cqes)
if n == 0 {
continue
}
p.uring.Advance(uint32(n))
if p.handler(p.cqes[:n]) {
return nil
}
}
}

// Close implements Poll.
func (p *uringPoll) Close() error {
var userData uint64
*(**FDOperator)(unsafe.Pointer(&userData)) = &FDOperator{FD: p.uring.Fd(), state: -1}
err := p.trig(userData)
return err
}

// Trigger implements Poll.
func (p *uringPoll) Trigger() error {
if atomic.AddUint32(&p.trigger, 1) > 1 {
return nil
}
var userData uint64
*(**FDOperator)(unsafe.Pointer(&userData)) = &FDOperator{FD: p.uring.Fd()}
err := p.trig(userData)
return err
}

// Control implements Poll.
func (p *uringPoll) Control(operator *FDOperator, event PollEvent) (err error) {
var pollOp Op
var mask uint32
switch event {
case PollReadable:
operator.inuse()
mask = syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR
pollOp = PollAdd(uintptr(operator.FD), mask)
case PollModReadable:
operator.inuse()
mask = syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR
pollOp = PollAdd(uintptr(operator.FD), mask)
case PollDetach:
pollOp = PollRemove(uint64(uintptr(unsafe.Pointer(operator))))
case PollWritable:
operator.inuse()
mask = EPOLLET | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR
pollOp = PollAdd(uintptr(operator.FD), mask)
case PollR2RW:
mask = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR
pollOp = PollAdd(uintptr(operator.FD), mask)
case PollRW2R:
mask = syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR
pollOp = PollAdd(uintptr(operator.FD), mask)
}

var userData uint64
*(**FDOperator)(unsafe.Pointer(&userData)) = operator

err = p.uring.Queue(pollOp, 0, userData)
if err != nil {
panic(err)
}

_, err = p.uring.Submit()
return err
}

func (p *uringPoll) reset(size, caps int) {
p.size, p.caps = size, caps
p.cqes, p.barriers = make([]*URingCQE, size), make([]barrier, size)
for i := range p.barriers {
p.barriers[i].bs = make([][]byte, caps)
p.barriers[i].ivs = make([]syscall.Iovec, caps)
}
}

func (p *uringPoll) handler(cqes []*URingCQE) (closed bool) {
for i := range cqes {
var operator = *(**FDOperator)(unsafe.Pointer(&cqes[i].UserData))
// trigger or exit gracefully
if operator.FD == p.uring.Fd() {
// must clean trigger first
atomic.StoreUint32(&p.trigger, 0)
// if closed & exit
if operator.state == -1 {
p.uring.Close()
return true
}
operator.done()
continue
}

if !operator.do() {
continue
}

var events = cqes[i].Res
// check poll in
if events&syscall.EPOLLIN != 0 {
if operator.OnRead != nil {
// for non-connection
operator.OnRead(p)
} else {
// only for connection
var bs = operator.Inputs(p.barriers[i].bs)
if len(bs) > 0 {
var n, err = readv(operator.FD, bs, p.barriers[i].ivs)
operator.InputAck(n)
if err != nil && err != syscall.EAGAIN && err != syscall.EINTR {
log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error())
p.appendHup(operator)
continue
}
}
}
}

// check hup
if events&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0 {
p.appendHup(operator)
continue
}
if events&syscall.EPOLLERR != 0 {
// Under block-zerocopy, the kernel may give an error callback, which is not a real error, just an EAGAIN.
// So here we need to check this error, if it is EAGAIN then do nothing, otherwise still mark as hup.
if _, _, _, _, err := syscall.Recvmsg(operator.FD, nil, nil, syscall.MSG_ERRQUEUE); err != syscall.EAGAIN {
p.appendHup(operator)
} else {
operator.done()
}
continue
}

// check poll out
if events&syscall.EPOLLOUT != 0 {
if operator.OnWrite != nil {
// for non-connection
operator.OnWrite(p)
} else {
// only for connection
var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs)
if len(bs) > 0 {
// TODO: Let the upper layer pass in whether to use ZeroCopy.
var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy)
operator.OutputAck(n)
if err != nil && err != syscall.EAGAIN {
log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error())
p.appendHup(operator)
continue
}
}
}
}
operator.done()
}
// hup conns together to avoid blocking the poll.
p.detaches()
return false
}

func (p *uringPoll) trig(userData uint64) error {
err := p.uring.Queue(Nop(), 0, userData)
if err != nil {
return err
}
_, err = p.uring.Submit()
return err
}

func (p *uringPoll) appendHup(operator *FDOperator) {
p.hups = append(p.hups, operator.OnHup)
operator.Control(PollDetach)
operator.done()
}

func (p *uringPoll) detaches() {
if len(p.hups) == 0 {
return
}
hups := p.hups
p.hups = nil
go func(onhups []func(p Poll) error) {
for i := range onhups {
if onhups[i] != nil {
onhups[i](p)
}
}
}(hups)
}
Loading