Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: feat: io_uring for netpoll I/O poller #197

Open
wants to merge 67 commits into
base: develop
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
d766619
feat: add variable parameters to manager.Run() for PollType of manage…
Aug 24, 2022
0227947
feat: re-design openPoll() to select one PollType
Aug 24, 2022
bb5ef53
feat: add IOURingPoll (WIP)
Aug 25, 2022
16074e4
feat: add sysMmap & sysMunmap
Aug 25, 2022
00ee563
fix: rename io_uring to uring
Aug 27, 2022
04cd494
fix: uniform variable r to u
Aug 29, 2022
a940bfa
feat: add const for mmap
Aug 29, 2022
a375d8c
feat: add setup, enter & register for system call
Aug 29, 2022
3e90d2b
feat: add setup options
Aug 29, 2022
8db732f
feat: add SQEntry & CQEvent
Aug 29, 2022
ea33f82
feat: add atomic operation for barrier
Aug 29, 2022
fda4b12
feat: add probe supported capability
Aug 29, 2022
7f9768b
feat: add advance usage for register
Aug 29, 2022
fdc0e5b
feat: add uring for low-level interface
Aug 29, 2022
4c5b712
feat: add submission operations
Aug 29, 2022
641d782
feat: add completion operations
Aug 29, 2022
8e81931
fix: restructure URingSQE & URingCQE
Sep 7, 2022
1057583
fix: wrap mmap & unmmap, recovery syscall.MAP_POPULATE
Sep 7, 2022
a02b287
faet: public getOp by Op
Sep 7, 2022
58d195d
fix: update sysRegister to SysRegister
Sep 7, 2022
4fab811
feat: public syscall
Sep 7, 2022
e3f4b83
feat: restructure uring methord
Sep 7, 2022
392c003
fix: remove sys_barrier.go
Sep 7, 2022
5ad8bd6
fix: Copyright 2022 CloudWeGo Authors
Sep 7, 2022
61f407b
fix: rollback poll_default_* & poll_manager
Sep 8, 2022
c82d419
feat: restructure URingCQE, Error & rename setData to setUserData
Sep 14, 2022
6e445f8
fix: cal size
Sep 14, 2022
2e064d6
feat: add sys_operation
Sep 14, 2022
18aac13
feat: const _size* & fix Sys*
Sep 14, 2022
546b5cd
fix: correct methods
Sep 14, 2022
4386f8c
fix: restructure submit*
Sep 14, 2022
28cc8eb
feat: add Queue & fix others
Sep 14, 2022
a4ffc46
feat: add test-coverage at 65.6% with bad TestTimeoutWait
Sep 14, 2022
3816272
fix: correct import
Sep 14, 2022
dbdf554
fix: add timeout check for WaitCQEs
Sep 21, 2022
f95b7f6
fix: simplify Syscall6 for SysEnter
Sep 21, 2022
b6334f4
feat: add acceptOp
Sep 21, 2022
72137f0
fix: rename OpCode to OpFlag
Sep 21, 2022
32a14c0
feat: add cat example
Sep 21, 2022
53df5fc
feat: add server example
Sep 21, 2022
f86c99a
Merge branch 'cloudwego:develop' into feat/io_uring
Oct 10, 2022
cdbc94f
fix: rename cq.kRingMsk to cq.kRingMask, add annotation
Oct 12, 2022
c1ec061
feat: update peekBatchCQE & peekCQE, and add getEvents
Oct 12, 2022
6a4d7e2
fix: rename cq.kRingMsk to cq.kRingMask
Oct 12, 2022
e9ff6a3
feat: add benckmark for uring & epoll
Oct 12, 2022
7c65c95
Merge branch 'feat/io_uring' of github.com:Jacob953/netpoll into feat…
Oct 12, 2022
46ca4cf
feat: add memory_barrier
Oct 12, 2022
83cc3e2
fix: TestTimeoutWait not supported
Oct 13, 2022
93dda0b
fix: update SMP_MEMORY_BARRIER
Oct 13, 2022
625c0c4
feat: implement netpoll poller register
Oct 13, 2022
0013936
fix: rm poll_io_uring.go for restructuring
Oct 13, 2022
302e5be
fix: rm SMP_SQRING.Store() at SysSetup
Oct 13, 2022
03248a3
fix: openPoll segmentation violation
Oct 13, 2022
5e0b446
fix: simplify
Oct 14, 2022
4d8abb9
feat: modify OpFlag
Oct 14, 2022
ac1a9d8
feat: add RegisterURingPoll
Oct 14, 2022
e1a8711
feat: update pollRegister
Oct 14, 2022
9dbdc9d
feat: update go version to 1.17 for unsafe.Add
Oct 14, 2022
d2b3966
feat: add register for event, rm opflag
Oct 15, 2022
5a6033f
feat: add PollAdd & PollRemove
Oct 15, 2022
4608460
feat: add uring poller
Oct 15, 2022
0737179
fix: rm fmt(for test)
Oct 17, 2022
ef05b54
feat: add URingEpollCtl
Oct 17, 2022
326225e
fix: restructure uringpoll
Oct 17, 2022
861cfa8
fix: check trig and exit first
Oct 17, 2022
dbd117a
fix: add PollAdd for listen
Oct 17, 2022
cfc5b9a
feat: restructure Control
Oct 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: add completion operations
  • Loading branch information
Jacob953 committed Aug 29, 2022
commit 641d782abea636220df9ba6f323b7b805d278ed8
255 changes: 255 additions & 0 deletions uring/uring_cmplt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
// Copyright 2021 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package uring

import (
"math"
"runtime"
"syscall"
"time"
"unsafe"
)

type getData struct {
submit uint32
waitNr uint32
getFlags uint32
sz int
arg unsafe.Pointer
}

type getEventsArg struct {
sigMask uintptr
sigMaskSz uint32
_pad uint32
ts uintptr
}

// WaitCQE implements URing, it returns an I/O CQE, waiting for it if necessary
func (u *URing) WaitCQE() (cqe *URingCQE, err error) {
return u.WaitCQENr(1)
}

// WaitCQENr implements URing, it returns an I/O CQE, waiting for nr completions if one isn’t readily
func (u *URing) WaitCQENr(nr uint32) (cqe *URingCQE, err error) {
return u.getCQE(getData{
submit: 0,
waitNr: nr,
arg: unsafe.Pointer(nil),
})
}

// WaitCQEs implements URing, like WaitCQE() except it accepts a timeout value as well.
// Note that an SQE is used internally to handle the timeout. Applications using this function
// must never set sqe->user_data to LIBURING_UDATA_TIMEOUT.
func (u *URing) WaitCQEs(nr uint32, timeout time.Duration) (*URingCQE, error) {
var toSubmit uint32

if u.Params.flags&IORING_FEAT_EXT_ARG != 0 {
return u.WaitCQEsNew(nr, timeout)
}
toSubmit, err := u.submitTimeout(timeout)
if toSubmit == 0 {
return nil, err
}
return u.getCQE(getData{
submit: toSubmit,
waitNr: nr,
arg: unsafe.Pointer(nil),
sz: NSIG / 8,
})
}

// WaitCQETimeout implements URing, returns an I/O completion,
// if one is readily available. Doesn’t wait.
func (u *URing) WaitCQETimeout(timeout time.Duration) (cqe *URingCQE, err error) {
return u.WaitCQEs(1, timeout)
}

// PeekBatchCQE implements URing, it fills in an array of I/O CQE up to count,
// if they are available, returning the count of completions filled.
// Does not wait for completions. They have to be already available for them to be returned by this function.
func (u *URing) PeekBatchCQE(cqes []*URingCQE) int {
var shift int
if u.Params.flags&IORING_SETUP_CQE32 != 0 {
shift = 1
}

n := u.peekBatchCQE(cqes, shift)

if n == 0 && u.cqRingNeedFlush() {
sysEnter(u.fd, 0, 0, IORING_ENTER_GETEVENTS, nil)
n = u.peekBatchCQE(cqes, shift)
}

return n
}

// CQESeen implements URing, it must be called after PeekCQE() or WaitCQE()
// and after the cqe has been processed by the application.
func (u *URing) CQESeen() {
if u.cqRing.cqes != nil {
u.Advance(1)
}
}

// GetEventsArg implements URing
func GetEventsArg(sigMask uintptr, sigMaskSz uint32, ts uintptr) *getEventsArg {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

global func 别混在这里,如果是 private, 小写放最后;public 就放其他文件归类

return &getEventsArg{sigMask: sigMask, sigMaskSz: sigMaskSz, ts: ts}
}

// WaitCQEsNew implements URing
func (u *URing) WaitCQEsNew(nr uint32, timeout time.Duration) (cqe *URingCQE, err error) {
ts := syscall.NsecToTimespec(timeout.Nanoseconds())
arg := GetEventsArg(uintptr(unsafe.Pointer(nil)), NSIG/8, uintptr(unsafe.Pointer(&ts)))

cqe, err = u.getCQE(getData{
submit: 0,
waitNr: nr,
getFlags: IORING_ENTER_EXT_ARG,
arg: unsafe.Pointer(arg),
sz: int(unsafe.Sizeof(getEventsArg{})),
})

runtime.KeepAlive(arg)
runtime.KeepAlive(ts)
return
}

// getCQE implements URing
func (u *URing) getCQE(data getData) (cqe *URingCQE, err error) {
for {
var looped, needEnter bool
var flags, nrAvail uint32

nrAvail, cqe, err = u.peekCQE()

if err != nil {
break
}
if cqe == nil && data.waitNr == 0 && data.submit == 0 {
// If we already looped once, we already entererd
// the kernel. Since there's nothing to submit or
// wait for, don't keep retrying.
if looped || u.caRingNeedEnter() {
err = syscall.EAGAIN
break
}
needEnter = true
}

if data.waitNr > nrAvail || nrAvail != 0 {
flags = IORING_ENTER_GETEVENTS | data.getFlags
needEnter = true
}

if data.submit != 0 && u.sqRingNeedEnter(&flags) {
needEnter = true
}
if !needEnter {
break
}

if u.Params.flags&INT_FLAG_REG_RING != 0 {
flags |= IORING_ENTER_REGISTERED_RING
}

var ret uint
ret, err = sysEnter6(u.fd, data.submit, data.waitNr, flags, data.arg, data.sz)

if err != nil {
break
}

data.submit -= uint32(ret)
if cqe != nil {
break
}
looped = true

}
return
}

// submitTimeout implements URing
func (u *URing) submitTimeout(timeout time.Duration) (uint32, error) {
sqe, err := u.nextSQE()
if err != nil {
_, err = u.Submit()
if err != nil {
return 0, err
}
sqe, err = u.nextSQE()
if err != nil {
return uint32(syscall.EAGAIN), err
}
}
Timeout(timeout).Prep(sqe)
sqe.setData(LIBURING_UDATA_TIMEOUT)
return u.flushSQ(), nil
}

// peekCQE implements URing
func (u *URing) peekCQE() (avail uint32, cqe *URingCQE, err error) {
mask := *u.cqRing.kRingMask

var shift int
if u.Params.flags&IORING_SETUP_CQE32 != 0 {
shift = 1
}

for {
tail := SMP_LOAD_ACQUIRE_U32(u.cqRing.kTail)
head := SMP_LOAD_ACQUIRE_U32(u.cqRing.kHead)

avail = tail - head
if avail == 0 {
break
}
cqe = (*URingCQE)(unsafe.Add(unsafe.Pointer(u.cqRing.cqes), uintptr((head&mask)<<shift)*unsafe.Sizeof(URing{})))

if !(u.Params.features&IORING_FEAT_EXT_ARG == 0) && cqe.UserData == LIBURING_UDATA_TIMEOUT {
if cqe.Res < 0 {
err = cqe.Error()
}
u.Advance(1)
if err != nil {
continue
}
err = nil
}
}

return
}

// peekCQE implements URing
func (u *URing) peekBatchCQE(cqes []*URingCQE, shift int) int {
ready := u.cqRing.ready()
count := min(uint32(len(cqes)), ready)
if ready != 0 {
head := SMP_LOAD_ACQUIRE_U32(u.cqRing.kHead)
mask := SMP_LOAD_ACQUIRE_U32(u.cqRing.kRingMask)

last := head + count
for i := 0; head != last; head, i = head+1, i+1 {
cqes[i] = (*URingCQE)(unsafe.Add(unsafe.Pointer(u.cqRing.cqes), uintptr((head&mask)<<shift)*unsafe.Sizeof(URingCQE{})))
}
}
return int(count)
}

const INT_FLAG_REG_RING = 1
const LIBURING_UDATA_TIMEOUT = math.MaxUint64