diff --git a/go.mod b/go.mod index 800411e6..c811ccd4 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,8 @@ module github.com/cloudwego/netpoll -go 1.15 +go 1.17 -require github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7 +require ( + github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7 + golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64 +) diff --git a/go.sum b/go.sum index deaed30b..bb1ca641 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20220110181412-a018aaa089fe/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64 h1:UiNENfZ8gDvpiWw7IpOMQ27spWmThO1RwwdQVbJahJM= +golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= diff --git a/poll_default_bsd.go b/poll_default_bsd.go index ec8f070c..94ab4406 100644 --- a/poll_default_bsd.go +++ b/poll_default_bsd.go @@ -25,11 +25,7 @@ import ( "unsafe" ) -func openPoll() Poll { - return openDefaultPoll() -} - -func openDefaultPoll() *defaultPoll { +func openDefaultPoll() Poll { l := new(defaultPoll) p, err := syscall.Kqueue() if err != nil { diff --git a/poll_default_linux.go b/poll_default_linux.go index c31a43a0..4e92ef0b 100644 --- a/poll_default_linux.go +++ b/poll_default_linux.go @@ -25,12 +25,7 @@ import ( "unsafe" ) -// Includes defaultPoll/multiPoll/uringPoll... -func openPoll() Poll { - return openDefaultPoll() -} - -func openDefaultPoll() *defaultPoll { +func openDefaultPoll() Poll { var poll = defaultPoll{} poll.buf = make([]byte, 8) var p, err = syscall.EpollCreate1(0) diff --git a/poll_manager.go b/poll_manager.go index 398e7a6e..34f8469b 100644 --- a/poll_manager.go +++ b/poll_manager.go @@ -23,6 +23,11 @@ import ( "runtime" ) +// Includes defaultPoll/multiPoll/uringPoll... +func openPoll() Poll { + return pollRegister() +} + func setNumLoops(numLoops int) error { return pollmanager.SetNumLoops(numLoops) } diff --git a/poll_race_bsd.go b/poll_race_bsd.go index 39b2d7e6..bd028bc9 100644 --- a/poll_race_bsd.go +++ b/poll_race_bsd.go @@ -25,12 +25,7 @@ import ( "syscall" ) -// mock no race poll -func openPoll() Poll { - return openDefaultPoll() -} - -func openDefaultPoll() *defaultPoll { +func openDefaultPoll() Poll { l := new(defaultPoll) p, err := syscall.Kqueue() if err != nil { diff --git a/poll_race_linux.go b/poll_race_linux.go index da28cd49..16ef3660 100644 --- a/poll_race_linux.go +++ b/poll_race_linux.go @@ -25,12 +25,7 @@ import ( "syscall" ) -// mock no race poll -func openPoll() Poll { - return openDefaultPoll() -} - -func openDefaultPoll() *defaultPoll { +func openDefaultPoll() Poll { var poll = defaultPoll{} poll.buf = make([]byte, 8) var p, err = syscall.EpollCreate1(0) diff --git a/poll_register.go b/poll_register.go new file mode 100644 index 00000000..e8c5c2f2 --- /dev/null +++ b/poll_register.go @@ -0,0 +1,31 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !windows +// +build !windows + +package netpoll + +// registerPoll is the func of openning Poller +var pollRegister = openDefaultPoll + +// RegisterEpoll implement Epoll +func RegisterEpoll() { + pollRegister = openDefaultPoll +} + +// RegisterURingPoll implement URing Poller +func RegisterURingPoll() { + pollRegister = openURingPoll +} diff --git a/poll_uring.go b/poll_uring.go new file mode 100644 index 00000000..0c078000 --- /dev/null +++ b/poll_uring.go @@ -0,0 +1,250 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !race +// +build !race + +package netpoll + +import ( + "log" + "sync/atomic" + "syscall" + "unsafe" + + . "github.com/cloudwego/netpoll/uring" +) + +func openURingPoll() Poll { + poll := &uringPoll{} + uring, err := IOURing(128) + if err != nil { + panic(err) + } + poll.uring = uring + return poll +} + +type uringPoll struct { + size int + caps int + trigger uint32 + + uring *URing + cqes []*URingCQE + barriers []barrier + hups []func(p Poll) error +} + +// Wait implements Poll. +func (p *uringPoll) Wait() error { + // init + var caps, n = barriercap, 0 + p.reset(128, caps) + // wait + for { + if n == p.size && p.size < 128*1024 { + p.reset(p.size<<1, caps) + } + n = p.uring.PeekBatchCQE(p.cqes) + if n == 0 { + continue + } + p.uring.Advance(uint32(n)) + if p.handler(p.cqes[:n]) { + return nil + } + } +} + +// Close implements Poll. +func (p *uringPoll) Close() error { + var userData uint64 + *(**FDOperator)(unsafe.Pointer(&userData)) = &FDOperator{FD: p.uring.Fd(), state: -1} + err := p.trig(userData) + return err +} + +// Trigger implements Poll. +func (p *uringPoll) Trigger() error { + if atomic.AddUint32(&p.trigger, 1) > 1 { + return nil + } + var userData uint64 + *(**FDOperator)(unsafe.Pointer(&userData)) = &FDOperator{FD: p.uring.Fd()} + err := p.trig(userData) + return err +} + +// Control implements Poll. +func (p *uringPoll) Control(operator *FDOperator, event PollEvent) (err error) { + var pollOp Op + var mask uint32 + switch event { + case PollReadable: + operator.inuse() + mask = syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR + pollOp = PollAdd(uintptr(operator.FD), mask) + case PollModReadable: + operator.inuse() + mask = syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR + pollOp = PollAdd(uintptr(operator.FD), mask) + case PollDetach: + pollOp = PollRemove(uint64(uintptr(unsafe.Pointer(operator)))) + case PollWritable: + operator.inuse() + mask = EPOLLET | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR + pollOp = PollAdd(uintptr(operator.FD), mask) + case PollR2RW: + mask = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLERR + pollOp = PollAdd(uintptr(operator.FD), mask) + case PollRW2R: + mask = syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR + pollOp = PollAdd(uintptr(operator.FD), mask) + } + + var userData uint64 + *(**FDOperator)(unsafe.Pointer(&userData)) = operator + + err = p.uring.Queue(pollOp, 0, userData) + if err != nil { + panic(err) + } + + _, err = p.uring.Submit() + return err +} + +func (p *uringPoll) reset(size, caps int) { + p.size, p.caps = size, caps + p.cqes, p.barriers = make([]*URingCQE, size), make([]barrier, size) + for i := range p.barriers { + p.barriers[i].bs = make([][]byte, caps) + p.barriers[i].ivs = make([]syscall.Iovec, caps) + } +} + +func (p *uringPoll) handler(cqes []*URingCQE) (closed bool) { + for i := range cqes { + var operator = *(**FDOperator)(unsafe.Pointer(&cqes[i].UserData)) + // trigger or exit gracefully + if operator.FD == p.uring.Fd() { + // must clean trigger first + atomic.StoreUint32(&p.trigger, 0) + // if closed & exit + if operator.state == -1 { + p.uring.Close() + return true + } + operator.done() + continue + } + + if !operator.do() { + continue + } + + var events = cqes[i].Res + // check poll in + if events&syscall.EPOLLIN != 0 { + if operator.OnRead != nil { + // for non-connection + operator.OnRead(p) + } else { + // only for connection + var bs = operator.Inputs(p.barriers[i].bs) + if len(bs) > 0 { + var n, err = readv(operator.FD, bs, p.barriers[i].ivs) + operator.InputAck(n) + if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { + log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) + p.appendHup(operator) + continue + } + } + } + } + + // check hup + if events&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0 { + p.appendHup(operator) + continue + } + if events&syscall.EPOLLERR != 0 { + // Under block-zerocopy, the kernel may give an error callback, which is not a real error, just an EAGAIN. + // So here we need to check this error, if it is EAGAIN then do nothing, otherwise still mark as hup. + if _, _, _, _, err := syscall.Recvmsg(operator.FD, nil, nil, syscall.MSG_ERRQUEUE); err != syscall.EAGAIN { + p.appendHup(operator) + } else { + operator.done() + } + continue + } + + // check poll out + if events&syscall.EPOLLOUT != 0 { + if operator.OnWrite != nil { + // for non-connection + operator.OnWrite(p) + } else { + // only for connection + var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs) + if len(bs) > 0 { + // TODO: Let the upper layer pass in whether to use ZeroCopy. + var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) + operator.OutputAck(n) + if err != nil && err != syscall.EAGAIN { + log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) + p.appendHup(operator) + continue + } + } + } + } + operator.done() + } + // hup conns together to avoid blocking the poll. + p.detaches() + return false +} + +func (p *uringPoll) trig(userData uint64) error { + err := p.uring.Queue(Nop(), 0, userData) + if err != nil { + return err + } + _, err = p.uring.Submit() + return err +} + +func (p *uringPoll) appendHup(operator *FDOperator) { + p.hups = append(p.hups, operator.OnHup) + operator.Control(PollDetach) + operator.done() +} + +func (p *uringPoll) detaches() { + if len(p.hups) == 0 { + return + } + hups := p.hups + p.hups = nil + go func(onhups []func(p Poll) error) { + for i := range onhups { + if onhups[i] != nil { + onhups[i](p) + } + } + }(hups) +} diff --git a/uring/benchmark/epoll_test.go b/uring/benchmark/epoll_test.go new file mode 100644 index 00000000..14ff7e9e --- /dev/null +++ b/uring/benchmark/epoll_test.go @@ -0,0 +1,47 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package benchmark + +import ( + "syscall" + "testing" + + _ "github.com/cloudwego/netpoll" +) + +func BenchmarkEpoll(b *testing.B) { + p, err := syscall.EpollCreate1(0) + if err != nil { + panic(err) + } + defer syscall.Close(p) + b.ResetTimer() + for i := 0; i < b.N; i++ { + r0, _, e0 := syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0) + if e0 != 0 { + syscall.Close(p) + panic(err) + } + _, err = syscall.Write(int(r0), []byte{1, 0, 0, 0, 0, 0, 0, 0}) + MustNil(err) + } + b.StopTimer() +} + +func MustNil(err error) { + if err != nil { + panic(err) + } +} diff --git a/uring/benchmark/uring_test.go b/uring/benchmark/uring_test.go new file mode 100644 index 00000000..72eab030 --- /dev/null +++ b/uring/benchmark/uring_test.go @@ -0,0 +1,38 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package benchmark + +import ( + "testing" + + . "github.com/cloudwego/netpoll/uring" +) + +func BenchmarkUring(b *testing.B) { + u, err := IOURing(8) + if err != nil { + panic(err) + } + defer u.Close() + b.ResetTimer() + for i := 0; i < b.N; i++ { + u.Queue(Nop(), 0, 0) + _, err = u.Submit() + MustNil(err) + _, err = u.WaitCQE() + MustNil(err) + } + b.StopTimer() +} diff --git a/uring/example/cat/main.go b/uring/example/cat/main.go new file mode 100644 index 00000000..9a5eb658 --- /dev/null +++ b/uring/example/cat/main.go @@ -0,0 +1,152 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "log" + "os" + + . "github.com/cloudwego/netpoll/uring" +) + +const BLOCK_SZ = 1024 + +type fileInfo struct { + fd uintptr + fileSZ int64 + buffs [][]byte + readvOp *ReadVOp /* Referred by readv/writev */ +} + +var fi fileInfo + +/* +* Returns the size of the file whose open file descriptor is passed in. +* Properly handles regular file and block devices as well. Pretty. +* */ +func getFileSize(file *os.File) int64 { + stat, err := file.Stat() + MustNil(err) + + return stat.Size() +} + +/* + * Output a string of characters of len length to stdout. + * We use buffered output here to be efficient, + * since we need to output character-by-character. + * */ +func outputToConsole(buff []byte) { + fmt.Printf("%s", string(buff)) +} + +/* + * Wait for a completion to be available, fetch the data from + * the readv operation and print it to the console. + * */ +func getCompletionAndPrint(u *URing) (err error) { + cqe, err := u.WaitCQE() + MustNil(err) + if cqe.Res < 0 { + fmt.Printf("Async readv failed.\n") + } + + blocks := int(fi.fileSZ) / BLOCK_SZ + if fi.fileSZ%BLOCK_SZ != 0 { + blocks++ + } + for i := 0; i < blocks; i++ { + outputToConsole(fi.buffs[i]) + } + + u.CQESeen() + + return nil +} + +/* + * Submit the readv request via liburing + * */ +func submitReadRequest(u *URing, fileName string) (err error) { + file, err := os.Open(fileName) + MustNil(err) + + fileSZ := getFileSize(file) + bytesRemaining := fileSZ + + blocks := int(fileSZ / BLOCK_SZ) + if fileSZ%BLOCK_SZ != 0 { + blocks++ + } + + buffs := make([][]byte, 0, blocks) + + /* + * For each block of the file we need to read, we allocate an iovec struct + * which is indexed into the iovecs array. This array is passed in as part + * of the submission. If you don't understand this, then you need to look + * up how the readv() and writev() system calls work. + * */ + for bytesRemaining != 0 { + bytesToRead := bytesRemaining + + if bytesToRead > BLOCK_SZ { + bytesToRead = BLOCK_SZ + } + + buffs = append(buffs, make([]byte, bytesToRead)) + bytesRemaining -= bytesToRead + } + + fi := &fileInfo{ + fd: file.Fd(), + fileSZ: fileSZ, + buffs: buffs, + readvOp: ReadV(file.Fd(), buffs, 0), + } + /* Setup a readv operation, user data */ + err = u.Queue(fi.readvOp, 0, uint64(fi.fd)) + /* Finally, submit the request */ + u.Submit() + return nil +} + +func main() { + if len(os.Args) < 2 { + fmt.Printf("Usage: %s [file name] <[file name] ...>\n", + os.Args[0]) + return + } + + /* Initialize io_uring */ + u, err := IOURing(8) + MustNil(err) + /* Call the clean-up function. */ + defer u.Close() + + for _, fileName := range os.Args[1:] { + err := submitReadRequest(u, fileName) + MustNil(err) + + getCompletionAndPrint(u) + } +} + +func MustNil(err error) { + if err != nil { + log.Fatal(err) + } +} diff --git a/uring/example/server/main.go b/uring/example/server/main.go new file mode 100644 index 00000000..a1ee189a --- /dev/null +++ b/uring/example/server/main.go @@ -0,0 +1,163 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "errors" + "fmt" + "log" + "syscall" + + . "github.com/cloudwego/netpoll/uring" +) + +const ( + ENTRIES = 4096 + DEFAULT_SERVER_PORT = 8000 + QUEUE_DEPTH = 256 + READ_SZ = 8192 +) + +type eventType int + +const ( + EVENT_TYPE_ACCEPT eventType = iota + EVENT_TYPE_READ + EVENT_TYPE_WRITE +) + +type request struct { + fd int + eventType eventType + recvOp *RecvOp + sendOp *SendOp +} + +var requests [4096]request +var bufs [][]byte + +func init() { + for fd := range requests { + requests[fd].recvOp = Recv(uintptr(fd), nil, 0) + requests[fd].sendOp = Send(uintptr(fd), nil, 0) + } + bufs = make([][]byte, ENTRIES) + for idx := range bufs { + bufs[idx] = make([]byte, READ_SZ) + } +} + +func main() { + serverSockFd, err := setupListeningSocket(DEFAULT_SERVER_PORT) + MustNil(err) + defer syscall.Close(serverSockFd) + + fmt.Printf("ZeroHTTPd listening on port: %d\n", DEFAULT_SERVER_PORT) + + u, err := IOURing(ENTRIES) + MustNil(err) + defer u.Close() + + serverLoop(u, serverSockFd) +} + +func setupListeningSocket(port int) (serverSockFd int, err error) { + serverSockFd, err = syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0) + MustNil(err) + + err = syscall.SetsockoptInt(serverSockFd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1) + MustNil(err) + + err = syscall.Bind(serverSockFd, &syscall.SockaddrInet4{Port: port}) + MustNil(err) + + err = syscall.Listen(serverSockFd, QUEUE_DEPTH) + MustNil(err) + return +} + +func serverLoop(u *URing, serverSockFd int) { + accept := Accept(uintptr(serverSockFd), 0) + addAcceptRequest(u, accept) + + cqes := make([]*URingCQE, QUEUE_DEPTH) + + for { + _, err := u.Submit() + MustNil(err) + + _, err = u.WaitCQE() + if errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EINTR) { + continue + } + MustNil(err) + + for i, n := 0, u.PeekBatchCQE(cqes); i < n; i++ { + cqe := cqes[i] + + userData := requests[cqe.UserData] + eventType := userData.eventType + res := cqe.Res + + u.CQESeen() + + switch eventType { + case EVENT_TYPE_ACCEPT: + addReadRequest(u, int(res)) + addAcceptRequest(u, accept) + case EVENT_TYPE_READ: + addReadRequest(u, userData.fd) + case EVENT_TYPE_WRITE: + if res <= 0 { + syscall.Shutdown(userData.fd, syscall.SHUT_RDWR) + } else { + addWriteRequest(u, userData.fd, res) + } + } + } + } +} + +func addAcceptRequest(u *URing, accept *AcceptOp) { + requests[accept.Fd()].fd = accept.Fd() + requests[accept.Fd()].eventType = EVENT_TYPE_ACCEPT + + err := u.Queue(accept, 0, uint64(accept.Fd())) + MustNil(err) +} + +func addReadRequest(u *URing, fd int) { + requests[fd].fd = fd + requests[fd].eventType = EVENT_TYPE_READ + requests[fd].recvOp.SetBuff(bufs[fd]) + + err := u.Queue(requests[fd].recvOp, 0, uint64(fd)) + MustNil(err) +} + +func addWriteRequest(u *URing, fd int, bytes int32) { + requests[fd].fd = fd + requests[fd].eventType = EVENT_TYPE_WRITE + requests[fd].sendOp.SetBuff(bufs[fd][:bytes]) + + err := u.Queue(requests[fd].sendOp, 0, uint64(fd)) + MustNil(err) +} + +func MustNil(err error) { + if err != nil { + log.Fatal(err) + } +} diff --git a/uring/sys_enter.go b/uring/sys_enter.go new file mode 100644 index 00000000..ed3445b0 --- /dev/null +++ b/uring/sys_enter.go @@ -0,0 +1,143 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package uring + +import ( + "syscall" +) + +// Submission Queue Entry, IO submission data structure +type URingSQE struct { + OpCode uint8 // type of operation for this sqe + Flags uint8 // IOSQE_ flags + IOPrio uint16 // ioprio for the request + Fd int32 // file descriptor to do IO on + Off uint64 // offset into file + Addr uint64 // pointer to buffer or iovecs + Len uint32 // buffer size or number of iovecs + UnionFlags uint32 + UserData uint64 // data to be passed back at completion time + + pad [3]uint64 +} + +// PrepRW implements SQE +func (s *URingSQE) PrepRW(op uint8, fd int32, addr uintptr, len uint32, offset uint64) { + s.OpCode = op + s.Flags = 0 + s.IOPrio = 0 + s.Fd = fd + s.Off = offset + s.setAddr(addr) + s.Len = len + s.UnionFlags = 0 + s.UserData = 0 + s.pad[0] = 0 + s.pad[1] = 0 + s.pad[2] = 0 +} + +// Completion Queue Eveny, IO completion data structure +type URingCQE struct { + UserData uint64 // sqe->data submission passed back + Res int32 // result code for this event + Flags uint32 + + // TODO: If the ring is initialized with IORING_SETUP_CQE32, then this field + // contains 16-bytes of padding, doubling the size of the CQE. + // BigCQE [2]uint64 +} + +// Error implements CQE +func (c *URingCQE) Error() error { + if c.Res < 0 { + return syscall.Errno(uintptr(-c.Res)) + } + return nil +} + +// Data implements CQE +func (c *URingCQE) Data() uint64 { + return c.UserData +} + +// setData sets the user data field of the SQE instance passed in. +func (s *URingSQE) setUserData(ud uint64) { + s.UserData = ud +} + +// setFlags sets the flags field of the SQE instance passed in. +func (s *URingSQE) setFlags(flags uint8) { + s.Flags = flags +} + +// setAddr sets the flags field of the SQE instance passed in. +func (s *URingSQE) setAddr(addr uintptr) { + s.Addr = uint64(addr) +} + +// Flags of CQE +// IORING_CQE_F_BUFFER If set, the upper 16 bits are the buffer ID +// IORING_CQE_F_MORE If set, parent SQE will generate more CQE entries +// IORING_CQE_F_SOCK_NONEMPTY If set, more data to read after socket recv +const ( + IORING_CQE_F_BUFFER uint32 = 1 << iota + IORING_CQE_F_MORE + IORING_CQE_F_SOCK_NONEMPTY +) + +const IORING_CQE_BUFFER_SHIFT = 16 + +// io_uring_enter(2) flags +const ( + IORING_ENTER_GETEVENTS uint32 = 1 << iota + IORING_ENTER_SQ_WAKEUP + IORING_ENTER_SQ_WAIT + IORING_ENTER_EXT_ARG + IORING_ENTER_REGISTERED_RING +) + +// If sqe->file_index is set to this for opcodes that instantiate a new +// direct descriptor (like openat/openat2/accept), then io_uring will allocate +// an available direct descriptor instead of having the application pass one +// in. The picked direct descriptor will be returned in cqe->res, or -ENFILE +// if the space is full. +const ( + IOSQE_FIXED_FILE_BIT = iota + IOSQE_IO_DRAIN_BIT + IOSQE_IO_LINK_BIT + IOSQE_IO_HARDLINK_BIT + IOSQE_ASYNC_BIT + IOSQE_BUFFER_SELECT_BIT + IOSQE_CQE_SKIP_SUCCESS_BIT +) + +// Flags of SQE +const ( + // IOSQE_FIXED_FILE means use fixed fileset + IOSQE_FIXED_FILE uint8 = 1 << IOSQE_FIXED_FILE_BIT + // IOSQE_IO_DRAIN means issue after inflight IO + IOSQE_IO_DRAIN uint8 = 1 << IOSQE_IO_DRAIN_BIT + // IOSQE_IO_LINK means links next sqe + IOSQE_IO_LINK uint8 = 1 << IOSQE_IO_LINK_BIT + // IOSQE_IO_HARDLINK means like LINK, but stronger + IOSQE_IO_HARDLINK uint8 = 1 << IOSQE_IO_HARDLINK_BIT + // IOSQE_ASYNC means always go async + IOSQE_ASYNC uint8 = 1 << IOSQE_ASYNC_BIT + // IOSQE_BUFFER_SELECT means select buffer from sqe->buf_group + IOSQE_BUFFER_SELECT uint8 = 1 << IOSQE_BUFFER_SELECT_BIT + // IOSQE_CQE_SKIP_SUCCESS means don't post CQE if request succeeded + IOSQE_CQE_SKIP_SUCCESS uint8 = 1 << IOSQE_CQE_SKIP_SUCCESS_BIT +) diff --git a/uring/sys_mmap.go b/uring/sys_mmap.go new file mode 100644 index 00000000..8f8f4a22 --- /dev/null +++ b/uring/sys_mmap.go @@ -0,0 +1,115 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package uring + +import ( + "syscall" + "unsafe" +) + +// sysMmap is used to free the URingSQE and URingCQE, +func (u *URing) sysMunmap() (err error) { + err = mumap(u.sqRing.buff) + if u.cqRing.buff != nil && &u.cqRing.buff[0] != &u.sqRing.buff[0] { + err = mumap(u.cqRing.buff) + } + return +} + +// sysMmap is used to configure the URingSQE and URingCQE, +// it should only be called after the sysSetUp function has completed successfully. +func (u *URing) sysMmap(p *ringParams) (err error) { + size := _sizeCQE + if p.flags&IORING_SETUP_CQE32 != 0 { + size += _sizeCQE + } + u.sqRing.ringSize = uint64(p.sqOffset.array) + uint64(p.sqEntries*(uint32)(_sizeU32)) + u.cqRing.ringSize = uint64(p.cqOffset.cqes) + uint64(p.cqEntries*(uint32)(size)) + + if p.features&IORING_FEAT_SINGLE_MMAP != 0 { + if u.cqRing.ringSize > u.sqRing.ringSize { + u.sqRing.ringSize = u.cqRing.ringSize + } + u.cqRing.ringSize = u.sqRing.ringSize + } + + data, err := mmap(u.fd, 0, int(u.sqRing.ringSize)) + if err != nil { + return err + } + u.sqRing.buff = data + + if p.features&IORING_FEAT_SINGLE_MMAP != 0 { + u.cqRing.buff = u.sqRing.buff + } else { + data, err = mmap(u.fd, int64(IORING_OFF_CQ_RING), int(u.cqRing.ringSize)) + if err != nil { + u.sysMunmap() + return err + } + u.cqRing.buff = data + } + + ringStart := &u.sqRing.buff[0] + u.sqRing.kHead = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.sqOffset.head))) + u.sqRing.kTail = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.sqOffset.tail))) + u.sqRing.kRingMask = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.sqOffset.ringMask))) + u.sqRing.kRingEntries = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.sqOffset.ringEntries))) + u.sqRing.kFlags = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.sqOffset.flags))) + u.sqRing.kDropped = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.sqOffset.dropped))) + u.sqRing.array = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.sqOffset.array))) + + size = uintptr(p.sqEntries) * _sizeSQE + + buff, err := mmap(u.fd, int64(IORING_OFF_SQES), int(size)) + if err != nil { + _ = u.sysMunmap() + return err + } + u.sqRing.sqeBuff = buff + + cqRingPtr := uintptr(unsafe.Pointer(&u.cqRing.buff[0])) + ringStart = &u.cqRing.buff[0] + + u.cqRing.kHead = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.cqOffset.head))) + u.cqRing.kTail = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.cqOffset.tail))) + u.cqRing.kRingMask = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.sqOffset.ringMask))) + u.cqRing.kRingEntries = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.cqOffset.ringEntries))) + u.cqRing.kOverflow = (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.cqOffset.overflow))) + u.cqRing.cqes = (*URingCQE)(unsafe.Pointer(uintptr(unsafe.Pointer(ringStart)) + uintptr(p.cqOffset.cqes))) + if p.cqOffset.flags != 0 { + u.cqRing.kFlags = cqRingPtr + uintptr(p.cqOffset.flags) + } + + return nil +} + +func mumap(b []byte) (err error) { + return syscall.Munmap(b) +} + +func mmap(fd int, offset int64, length int) (data []byte, err error) { + return syscall.Mmap(fd, offset, length, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED|syscall.MAP_POPULATE) +} + +// Magic offsets for the application to mmap the data it needs +const ( + // IORING_OFF_SQ_RING maps sqring to program memory space + IORING_OFF_SQ_RING uint64 = 0 + // IORING_OFF_CQ_RING maps cqring to program memory space + IORING_OFF_CQ_RING uint64 = 0x8000000 + // IORING_OFF_SQES maps sqes array to program memory space + IORING_OFF_SQES uint64 = 0x10000000 +) diff --git a/uring/sys_op.go b/uring/sys_op.go new file mode 100644 index 00000000..eb1037b3 --- /dev/null +++ b/uring/sys_op.go @@ -0,0 +1,561 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package uring + +import ( + "syscall" + "time" + "unsafe" + + "golang.org/x/sys/unix" +) + +// Op supports operations for SQE +type Op interface { + Prep(*URingSQE) + getFlag() uint8 +} + +// Flags of URing Operation +const ( + IORING_OP_NOP uint8 = iota + IORING_OP_READV + IORING_OP_WRITEV + IORING_OP_FSYNC + IORING_OP_READ_FIXED + IORING_OP_WRITE_FIXED + IORING_OP_POLL_ADD + IORING_OP_POLL_REMOVE + IORING_OP_SYNC_FILE_RANGE + IORING_OP_SENDMSG + IORING_OP_RECVMSG + IORING_OP_TIMEOUT + IORING_OP_TIMEOUT_REMOVE + IORING_OP_ACCEPT + IORING_OP_ASYNC_CANCEL + IORING_OP_LINK_TIMEOUT + IORING_OP_CONNECT + IORING_OP_FALLOCATE + IORING_OP_OPENAT + IORING_OP_CLOSE + IORING_OP_RSRC_UPDATE + IORING_OP_FILES_UPDATE = IORING_OP_RSRC_UPDATE + IORING_OP_STATX + IORING_OP_READ + IORING_OP_WRITE + IORING_OP_FADVISE + IORING_OP_MADVISE + IORING_OP_SEND + IORING_OP_RECV + IORING_OP_OPENAT2 + IORING_OP_EPOLL_CTL + IORING_OP_SPLICE + IORING_OP_PROVIDE_BUFFERS + IORING_OP_REMOVE_BUFFERS + IORING_OP_TEE + IORING_OP_SHUTDOWN + IORING_OP_RENAMEAT + IORING_OP_UNLINKAT + IORING_OP_MKDIRAT + IORING_OP_SYMLINKAT + IORING_OP_LINKAT + IORING_OP_MSG_RING + IORING_OP_FSETXATTR + IORING_OP_SETXATTR + IORING_OP_FGETXATTR + IORING_OP_GETXATTR + IORING_OP_SOCKET + IORING_OP_URING_CMD + IORING_OP_SENDZC_NOTIF + + // this goes last, obviously */ + IORING_OP_LAST +) + +// timeoutFlags of SQE +const ( + IORING_TIMEOUT_ABS uint8 = 1 << iota + IORING_TIMEOUT_UPDATE + IORING_TIMEOUT_BOOTTIME + IORING_TIMEOUT_REALTIME + IORING_LINK_TIMEOUT_UPDATE + IORING_TIMEOUT_ETIME_SUCCESS + IORING_TIMEOUT_CLOCK_MASK = IORING_TIMEOUT_BOOTTIME | IORING_TIMEOUT_REALTIME + IORING_TIMEOUT_UPDATE_MASK = IORING_TIMEOUT_UPDATE | IORING_LINK_TIMEOUT_UPDATE +) + +// sqe->splice_flags, extends splice(2) flags +const SPLICE_F_FD_IN_FIXED uint32 = 1 << 31 // the last bit of __u32 + +// POLL_ADD flags. Note that since sqe->poll_events is the flag space, the +// command flags for POLL_ADD are stored in sqe->len. + +// IORING_POLL_ADD_MULTI Multishot poll. Sets IORING_CQE_F_MORE if +// the poll handler will continue to report +// CQEs on behalf of the same SQE. + +// IORING_POLL_UPDATE Update existing poll request, matching +// sqe->addr as the old user_data field. + +// IORING_POLL_LEVEL Level triggered poll. +const ( + IORING_POLL_ADD_MULTI uint8 = 1 << iota + IORING_POLL_UPDATE_EVENTS + IORING_POLL_UPDATE_USER_DATA + IORING_POLL_ADD_LEVEL +) + +// ASYNC_CANCEL flags. + +// IORING_ASYNC_CANCEL_ALL Cancel all requests that match the given key +// IORING_ASYNC_CANCEL_FD Key off 'fd' for cancelation rather than the +// request 'user_data' +// IORING_ASYNC_CANCEL_ANY Match any request +// IORING_ASYNC_CANCEL_FD_FIXED 'fd' passed in is a fixed descriptor +const ( + IORING_ASYNC_CANCEL_ALL uint8 = 1 << iota + IORING_ASYNC_CANCEL_FD + IORING_ASYNC_CANCEL_ANY + IORING_ASYNC_CANCEL_FD_FIXED +) + +// send/sendmsg and recv/recvmsg flags (sqe->ioprio) + +// IORING_RECVSEND_POLL_FIRST If set, instead of first attempting to send +// or receive and arm poll if that yields an +// -EAGAIN result, arm poll upfront and skip +// the initial transfer attempt. + +// IORING_RECV_MULTISHOT Multishot recv. Sets IORING_CQE_F_MORE if +// the handler will continue to report +// CQEs on behalf of the same SQE. + +// IORING_RECVSEND_FIXED_BUF Use registered buffers, the index is stored in +// the buf_index field. + +// IORING_RECVSEND_NOTIF_FLUSH Flush a notification after a successful +// successful. Only for zerocopy sends. + +const ( + IORING_RECVSEND_POLL_FIRST uint8 = 1 << iota + IORING_RECV_MULTISHOT + IORING_RECVSEND_FIXED_BUF + IORING_RECVSEND_NOTIF_FLUSH +) + +// accept flags stored in sqe->ioprio +const IORING_ACCEPT_MULTISHOT uint8 = 1 << iota + +// IORING_OP_RSRC_UPDATE flags +const ( + IORING_RSRC_UPDATE_FILES uint8 = iota + IORING_RSRC_UPDATE_NOTIF +) + +// IORING_OP_MSG_RING command types, stored in sqe->addr +const ( + IORING_MSG_DATA uint8 = iota // pass sqe->len as 'res' and off as user_data */ + IORING_MSG_SEND_FD // send a registered fd to another ring */ +) + +// IORING_OP_MSG_RING flags (sqe->msg_ring_flags) + +// IORING_MSG_RING_CQE_SKIP Don't post a CQE to the target ring. Not +// applicable for IORING_MSG_DATA, obviously. + +const IORING_MSG_RING_CQE_SKIP uint8 = iota + +// ------------------------------------------ implement Nop ------------------------------------------ + +func Nop() *NopOp { + return &NopOp{} +} + +type NopOp struct{} + +func (op *NopOp) Prep(sqe *URingSQE) { + sqe.PrepRW(op.getFlag(), -1, uintptr(unsafe.Pointer(nil)), 0, 0) +} + +func (op *NopOp) getFlag() uint8 { + return IORING_OP_NOP +} + +// ------------------------------------------ implement Read ------------------------------------------ + +func Read(fd uintptr, nbytes []byte, offset uint64) *ReadOp { + return &ReadOp{ + fd: fd, + nbytes: nbytes, + offset: offset, + } +} + +type ReadOp struct { + fd uintptr + nbytes []byte + offset uint64 +} + +func (op *ReadOp) Prep(sqe *URingSQE) { + sqe.PrepRW(op.getFlag(), int32(op.fd), uintptr(unsafe.Pointer(&op.nbytes[0])), uint32(len(op.nbytes)), op.offset) +} + +func (op *ReadOp) getFlag() uint8 { + return IORING_OP_READ +} + +// ------------------------------------------ implement Write ------------------------------------------ + +func Write(fd uintptr, nbytes []byte, offset uint64) *WriteOp { + return &WriteOp{ + fd: fd, + nbytes: nbytes, + offset: offset, + } +} + +type WriteOp struct { + fd uintptr + nbytes []byte + offset uint64 +} + +func (op *WriteOp) Prep(sqe *URingSQE) { + sqe.PrepRW(op.getFlag(), int32(op.fd), uintptr(unsafe.Pointer(&op.nbytes[0])), uint32(len(op.nbytes)), op.offset) +} + +func (op *WriteOp) getFlag() uint8 { + return IORING_OP_WRITE +} + +// ------------------------------------------ implement ReadV ------------------------------------------ + +func ReadV(fd uintptr, iovecs [][]byte, offset uint64) *ReadVOp { + buff := make([]syscall.Iovec, len(iovecs)) + for i := range iovecs { + buff[i].Base = &iovecs[i][0] + buff[i].SetLen(len(iovecs[i])) + } + return &ReadVOp{ + fd: fd, + nrVecs: uint32(len(buff)), + ioVecs: buff, + offset: offset, + } +} + +type ReadVOp struct { + fd uintptr + nrVecs uint32 + ioVecs []syscall.Iovec + offset uint64 +} + +func (op *ReadVOp) Prep(sqe *URingSQE) { + sqe.PrepRW(op.getFlag(), int32(op.fd), uintptr(unsafe.Pointer(&op.ioVecs[0])), op.nrVecs, op.offset) +} + +func (op *ReadVOp) getFlag() uint8 { + return IORING_OP_READV +} + +// ------------------------------------------ implement WriteV ------------------------------------------ + +func WriteV(fd uintptr, iovecs [][]byte, offset uint64) *WriteVOp { + buff := make([]syscall.Iovec, len(iovecs)) + for i := range iovecs { + buff[i].SetLen(len(iovecs[i])) + buff[i].Base = &iovecs[i][0] + } + return &WriteVOp{ + fd: fd, + ioVecs: buff, + offset: offset, + } +} + +type WriteVOp struct { + fd uintptr + ioVecs []syscall.Iovec + offset uint64 +} + +func (op *WriteVOp) Prep(sqe *URingSQE) { + sqe.PrepRW(op.getFlag(), int32(op.fd), uintptr(unsafe.Pointer(&op.ioVecs[0])), uint32(len(op.ioVecs)), op.offset) +} + +func (op *WriteVOp) getFlag() uint8 { + return IORING_OP_WRITEV +} + +// ------------------------------------------ implement Close ------------------------------------------ + +func Close(fd uintptr) *CloseOp { + return &CloseOp{ + fd: fd, + } +} + +type CloseOp struct { + fd uintptr +} + +func (op *CloseOp) Prep(sqe *URingSQE) { + sqe.PrepRW(op.getFlag(), int32(op.fd), 0, 0, 0) +} + +func (op *CloseOp) getFlag() uint8 { + return IORING_OP_CLOSE +} + +// ------------------------------------------ implement RecvMsg ------------------------------------------ + +func RecvMsg(fd int, msg *syscall.Msghdr, flags uint32) *RecvMsgOp { + return &RecvMsgOp{ + fd: fd, + msg: msg, + flags: flags, + } +} + +type RecvMsgOp struct { + fd int + msg *syscall.Msghdr + flags uint32 +} + +func (op *RecvMsgOp) Prep(sqe *URingSQE) { + sqe.PrepRW(op.getFlag(), int32(op.fd), uintptr(unsafe.Pointer(op.msg)), 1, 0) + sqe.Flags = uint8(op.flags) +} + +func (op *RecvMsgOp) getFlag() uint8 { + return IORING_OP_RECVMSG +} + +// ------------------------------------------ implement SendMsg ------------------------------------------ + +func SendMsg(fd int, msg *syscall.Msghdr, flags uint32) *SendMsgOp { + return &SendMsgOp{ + fd: fd, + msg: msg, + flags: flags, + } +} + +type SendMsgOp struct { + fd int + msg *syscall.Msghdr + flags uint32 +} + +func (op *SendMsgOp) Prep(sqe *URingSQE) { + sqe.PrepRW(op.getFlag(), int32(op.fd), uintptr(unsafe.Pointer(op.msg)), 1, 0) + sqe.setFlags(uint8(op.flags)) +} + +func (op *SendMsgOp) getFlag() uint8 { + return IORING_OP_SENDMSG +} + +// ------------------------------------------ implement Accept ------------------------------------------ + +func Accept(fd uintptr, flags uint32) *AcceptOp { + return &AcceptOp{ + fd: fd, + addr: &unix.RawSockaddrAny{}, + len: unix.SizeofSockaddrAny, + flags: flags, + } +} + +type AcceptOp struct { + fd uintptr + addr *unix.RawSockaddrAny + len uint32 + flags uint32 +} + +func (op *AcceptOp) Prep(sqe *URingSQE) { + sqe.PrepRW(op.getFlag(), int32(op.fd), uintptr(unsafe.Pointer(op.addr)), 0, uint64(uintptr(unsafe.Pointer(&op.len)))) + sqe.UnionFlags = op.flags +} + +func (op *AcceptOp) getFlag() uint8 { + return IORING_OP_ACCEPT +} + +func (op *AcceptOp) Fd() int { + return int(op.fd) +} + +// ------------------------------------------ implement Recv ------------------------------------------ + +func Recv(sockFd uintptr, buf []byte, flags uint32) *RecvOp { + return &RecvOp{ + fd: sockFd, + buf: buf, + flags: flags, + } +} + +type RecvOp struct { + fd uintptr + buf []byte + flags uint32 +} + +func (op *RecvOp) Prep(sqe *URingSQE) { + sqe.PrepRW(op.getFlag(), int32(op.fd), uintptr(unsafe.Pointer(&op.buf[0])), uint32(len(op.buf)), 0) + sqe.setFlags(uint8(op.flags)) +} + +func (op *RecvOp) getFlag() uint8 { + return IORING_OP_RECV +} + +func (op *RecvOp) Fd() int { + return int(op.fd) +} + +func (op *RecvOp) SetBuff(buf []byte) { + op.buf = buf +} + +// ------------------------------------------ implement Send ------------------------------------------ + +func Send(sockFd uintptr, buf []byte, flags uint32) *SendOp { + return &SendOp{ + fd: sockFd, + buf: buf, + flags: flags, + } +} + +type SendOp struct { + fd uintptr + buf []byte + flags uint32 +} + +func (op *SendOp) Prep(sqe *URingSQE) { + sqe.PrepRW(op.getFlag(), int32(op.fd), uintptr(unsafe.Pointer(&op.buf[0])), uint32(len(op.buf)), 0) + sqe.setFlags(uint8(op.flags)) +} + +func (op *SendOp) getFlag() uint8 { + return IORING_OP_SEND +} + +func (op *SendOp) Fd() int { + return int(op.fd) +} + +func (op *SendOp) SetBuff(buf []byte) { + op.buf = buf +} + +// ------------------------------------------ implement Timeout ------------------------------------------ + +func Timeout(duration time.Duration) *TimeoutOp { + return &TimeoutOp{ + dur: duration, + } +} + +type TimeoutOp struct { + dur time.Duration +} + +func (op *TimeoutOp) Prep(sqe *URingSQE) { + spec := syscall.NsecToTimespec(op.dur.Nanoseconds()) + sqe.PrepRW(op.getFlag(), -1, uintptr(unsafe.Pointer(&spec)), 1, 0) +} + +func (op *TimeoutOp) getFlag() uint8 { + return IORING_OP_TIMEOUT +} + +// ------------------------------------------ implement PollAdd ------------------------------------------ + +func PollAdd(fd uintptr, mask uint32) *PollAddOp { + return &PollAddOp{ + fd: fd, + pollMask: mask, + } +} + +type PollAddOp struct { + fd uintptr + pollMask uint32 +} + +func (op *PollAddOp) Prep(sqe *URingSQE) { + sqe.PrepRW(op.getFlag(), int32(op.fd), uintptr(unsafe.Pointer(nil)), 0, 0) + sqe.UnionFlags = op.pollMask +} + +func (op *PollAddOp) getFlag() uint8 { + return IORING_OP_POLL_ADD +} + +// ------------------------------------------ implement PollRemove ------------------------------------------ + +func PollRemove(data uint64) *PollRemoveOp { + return &PollRemoveOp{ + userData: data, + } +} + +type PollRemoveOp struct { + userData uint64 +} + +func (op *PollRemoveOp) Prep(sqe *URingSQE) { + sqe.PrepRW(op.getFlag(), -1, uintptr(unsafe.Pointer(nil)), 0, 0) + sqe.setAddr(uintptr(op.userData)) +} + +func (op *PollRemoveOp) getFlag() uint8 { + return IORING_OP_POLL_REMOVE +} + +// ------------------------------------------ implement EpollCtl ------------------------------------------ + +// named URingEpollCtl in case it has the same name as EpollCtl +func URingEpollCtl(epfd, fd uintptr, opCode uint32, epollEvent unsafe.Pointer) *EpollCtlOp { + return &EpollCtlOp{ + epfd: epfd, + fd: fd, + opCode: opCode, + epollEvent: epollEvent, + } +} + +type EpollCtlOp struct { + epfd uintptr + fd uintptr + opCode uint32 + epollEvent unsafe.Pointer +} + +func (op *EpollCtlOp) Prep(sqe *URingSQE) { + sqe.PrepRW(op.getFlag(), int32(op.epfd), uintptr(op.epollEvent), op.opCode, uint64(op.fd)) +} + +func (op *EpollCtlOp) getFlag() uint8 { + return IORING_OP_EPOLL_CTL +} diff --git a/uring/sys_probe.go b/uring/sys_probe.go new file mode 100644 index 00000000..779faff3 --- /dev/null +++ b/uring/sys_probe.go @@ -0,0 +1,48 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package uring + +// Probe means Probing supported capabilities +type Probe struct { + lastOp uint8 // last opcode supported + opsLen uint8 // length of ops[] array below + resv uint16 + resv2 [3]uint32 + ops [256]probeOp +} + +// probeOp is params of Probe +type probeOp struct { + op uint8 + resv uint8 + flags uint16 // IO_URING_OP_* flags + resv2 uint32 +} + +// Op implements Probe, returns info for operation by flag. +func (p Probe) Op(idx int) *probeOp { + return &p.ops[idx] +} + +// OpFlagSupported implements Probe +func (p Probe) OpFlagSupported(op uint8) uint16 { + if op > p.lastOp { + return 0 + } + return p.ops[op].flags & IO_URING_OP_SUPPORTED +} + +// IO_URING_OP_SUPPORTED means OpFlags whether io_uring supported or not +const IO_URING_OP_SUPPORTED uint16 = 1 << 0 diff --git a/uring/sys_register.go b/uring/sys_register.go new file mode 100644 index 00000000..4c7603ae --- /dev/null +++ b/uring/sys_register.go @@ -0,0 +1,101 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package uring + +import ( + "syscall" + "unsafe" +) + +// io_uring_register(2) opcodes and arguments +const ( + IORING_REGISTER_BUFFERS = iota + IORING_UNREGISTER_BUFFERS + IORING_REGISTER_FILES + IORING_UNREGISTER_FILES + IORING_REGISTER_EVENTFD + IORING_UNREGISTER_EVENTFD + IORING_REGISTER_FILES_UPDATE + IORING_REGISTER_EVENTFD_ASYNC + IORING_REGISTER_PROBE + IORING_REGISTER_PERSONALITY + IORING_UNREGISTER_PERSONALITY + IORING_REGISTER_RESTRICTIONS + IORING_REGISTER_ENABLE_RINGS + + /* extended with tagging */ + IORING_REGISTER_FILES2 + IORING_REGISTER_FILES_UPDATE2 + IORING_REGISTER_BUFFERS2 + IORING_REGISTER_BUFFERS_UPDATE + + /* set/clear io-wq thread affinities */ + IORING_REGISTER_IOWQ_AFF + IORING_UNREGISTER_IOWQ_AFF + + /* set/get max number of io-wq workers */ + IORING_REGISTER_IOWQ_MAX_WORKERS + + /* register/unregister io_uring fd with the ring */ + IORING_REGISTER_RING_FDS + IORING_UNREGISTER_RING_FDS + + /* register ring based provide buffer group */ + IORING_REGISTER_PBUF_RING + IORING_UNREGISTER_PBUF_RING + + /* this goes last */ + IORING_REGISTER_LAST +) + +// ------------------------------------------ implement io_uring_register ------------------------------------------ + +// RegisterBuffers regists shared buffers +func (u *URing) RegisterBuffers(buffers []syscall.Iovec) error { + err := SysRegister(u.fd, IORING_REGISTER_BUFFERS, unsafe.Pointer(&buffers[0]), len(buffers)) + SMP_SQRING.Store(u.sqRing) + return err +} + +// UnRegisterBuffers unregists shared buffers +func (u *URing) UnRegisterBuffers() error { + err := SysRegister(u.fd, IORING_UNREGISTER_BUFFERS, unsafe.Pointer(nil), 0) + SMP_SQRING.Store(u.sqRing) + return err +} + +// RegisterBuffers regists shared files +func (u *URing) RegisterFilse(dp []int) error { + err := SysRegister(u.fd, IORING_REGISTER_FILES, unsafe.Pointer(&dp[0]), len(dp)) + SMP_SQRING.Store(u.sqRing) + return err +} + +// UnRegisterBuffers unregists shared files +func (u *URing) UnRegisterFiles() error { + err := SysRegister(u.fd, IORING_UNREGISTER_FILES, unsafe.Pointer(nil), 0) + SMP_SQRING.Store(u.sqRing) + return err +} + +func (u *URing) RegisterEventFd(fd int) error { + err := SysRegister(u.fd, IORING_REGISTER_EVENTFD, unsafe.Pointer(uintptr(fd)), 1) + return err +} + +func (u *URing) UnRegisterEventFd() error { + err := SysRegister(u.fd, IORING_UNREGISTER_EVENTFD, unsafe.Pointer(nil), 0) + return err +} diff --git a/uring/sys_setup.go b/uring/sys_setup.go new file mode 100644 index 00000000..a1351db4 --- /dev/null +++ b/uring/sys_setup.go @@ -0,0 +1,163 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package uring + +import "time" + +// ringParams means params of Uring +type ringParams struct { + sqEntries uint32 + cqEntries uint32 + flags uint32 + sqThreadCPU uint32 + sqThreadIdle uint32 + features uint32 + wqFD uint32 + resv [3]uint32 + sqOffset sqRingOffsets + cqOffset cqRingOffsets +} + +// sqRingOffsets means offsets of SQ Ring +type sqRingOffsets struct { + head uint32 + tail uint32 + ringMask uint32 + ringEntries uint32 + flags uint32 + dropped uint32 + array uint32 + resv1 uint32 + resv2 uint64 +} + +// cqRingOffsets means offsets of CQ Ring +type cqRingOffsets struct { + head uint32 + tail uint32 + ringMask uint32 + ringEntries uint32 + overflow uint32 + cqes uint32 + flags uint32 + resv1 uint32 + resv2 uint64 +} + +// sysSetUp() flags, used to configure the io_uring instance +const ( + // IORING_SETUP_IOPOLL, used to show io_context is polled + IORING_SETUP_IOPOLL uint32 = 1 << iota + // IORING_SETUP_SQPOLL, used to start SQ poll thread + IORING_SETUP_SQPOLL + // IORING_SETUP_SQ_AFF, used to make sq_thread_cpu valid + IORING_SETUP_SQ_AFF + // IORING_SETUP_CQSIZE, used to app defines CQ size + IORING_SETUP_CQSIZE + // IORING_SETUP_CLAMP, used to clamp SQ/CQ ring sizes + IORING_SETUP_CLAMP + // IORING_SETUP_ATTACH_WQ, used to attach to existing wq + IORING_SETUP_ATTACH_WQ + // IORING_SETUP_R_DISABLED, used to start with ring disabled + IORING_SETUP_R_DISABLED + // IORING_SETUP_SUBMIT_ALL, used to continue submit on error + IORING_SETUP_SUBMIT_ALL + + // Cooperative task running. When requests complete, they often require + // forcing the submitter to transition to the kernel to complete. If this + // flag is set, work will be done when the task transitions anyway, rather + // than force an inter-processor interrupt reschedule. This avoids interrupting + // a task running in userspace, and saves an IPI. + IORING_SETUP_COOP_TASKRUN + + // If COOP_TASKRUN is set, get notified if task work is available for + // running and a kernel transition would be needed to run it. This sets + // IORING_SQ_TASKRUN in the sq ring flags. Not valid with COOP_TASKRUN. + IORING_SETUP_TASKRUN_FLAG + IORING_SETUP_SQE128 // IORING_SETUP_SQE128, SQEs are 128 byte + IORING_SETUP_CQE32 // IORING_SETUP_CQE32, CQEs are 32 byte + + // Only one task is allowed to submit requests + IORING_SETUP_SINGLE_ISSUER +) + +// Features flags of ringParams +const ( + IORING_FEAT_SINGLE_MMAP uint32 = 1 << iota + IORING_FEAT_NODROP + IORING_FEAT_SUBMIT_STABLE + IORING_FEAT_RW_CUR_POS + IORING_FEAT_CUR_PERSONALITY + IORING_FEAT_FAST_POLL + IORING_FEAT_POLL_32BITS + IORING_FEAT_SQPOLL_NONFIXED + IORING_FEAT_EXT_ARG + IORING_FEAT_NATIVE_WORKERS + IORING_FEAT_RSRC_TAGS + IORING_FEAT_CQE_SKIP + IORING_FEAT_LINKED_FILE +) + +// setupOp provide options for io_uring instance when building +type setupOp func(params *ringParams) + +// ------------------------------------------ implement io_uring_setup ------------------------------------------ + +// IOPoll performs busy-waiting for an I/O completion, as opposed to +// getting notifications via an asynchronous IRQ (Interrupt Request) +func IOPoll() setupOp { + return func(params *ringParams) { + params.flags |= IORING_SETUP_IOPOLL + } +} + +// SQPoll creates a kernel thread to perform submission queue polling, +// when this flag is specified. +func SQPoll(idle time.Duration) setupOp { + return func(params *ringParams) { + params.flags |= IORING_SETUP_SQPOLL + params.sqThreadIdle = uint32(idle.Milliseconds()) + } +} + +// SQAff will binds the poll thread to the cpu set in the sq_thread_cpu field of the struct ringParams if it is specified. +// This flag is only meaningful when IORING_SETUP_SQPOLL is specified. +func SQAff(cpu uint32) setupOp { + return func(params *ringParams) { + params.flags |= IORING_SETUP_SQ_AFF + params.sqThreadCPU = cpu + } +} + +// CQSize creates the CQ with struct ringParams.cqes. +func CQSize(sz uint32) setupOp { + return func(params *ringParams) { + params.flags |= IORING_SETUP_CQSIZE + params.cqEntries = sz + } +} + +func AttachWQ(fd uint32) setupOp { + return func(params *ringParams) { + params.flags |= IORING_SETUP_ATTACH_WQ + params.wqFD = fd + } +} + +func URingDisabled() setupOp { + return func(params *ringParams) { + params.flags |= IORING_SETUP_R_DISABLED + } +} diff --git a/uring/syscall.go b/uring/syscall.go new file mode 100644 index 00000000..75ed5c47 --- /dev/null +++ b/uring/syscall.go @@ -0,0 +1,118 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package uring + +import ( + "math" + "os" + "sync/atomic" + "syscall" + "unsafe" +) + +// SysRegister registers user buffers or files for use in an io_uring(7) instance referenced by fd. +// Registering files or user buffers allows the kernel to take long term references to internal data structures +// or create long term mappings of application memory, greatly reducing per-I/O overhead. +func SysRegister(ringFd int, op int, arg unsafe.Pointer, nrArgs int) error { + _, _, err := syscall.Syscall6(SYS_IO_URING_REGISTER, uintptr(ringFd), uintptr(op), uintptr(arg), uintptr(nrArgs), 0, 0) + if err != 0 { + return os.NewSyscallError("io_uring_register", err) + } + return nil +} + +// SysSetUp sets up a SQ and CQ with at least entries entries, and +// returns a file descriptor which can be used to perform subsequent operations on the io_uring instance. +// The SQ and CQ are shared between userspace and the kernel, which eliminates the need to copy data when initiating and completing I/O. +func SysSetUp(entries uint32, params *ringParams) (int, error) { + p, _, err := syscall.Syscall(SYS_IO_URING_SETUP, uintptr(entries), uintptr(unsafe.Pointer(params)), 0) + if err != 0 { + return int(p), os.NewSyscallError("io_uring_setup", err) + } + return int(p), nil +} + +// SysEnter is used to initiate and complete I/O using the shared SQ and CQ setup by a call to io_uring_setup(2). +// A single call can both submit new I/O and wait for completions of I/O initiated by this call or previous calls to io_uring_enter(). +func SysEnter(fd int, toSubmit uint32, minComplete uint32, flags uint32, sig unsafe.Pointer, sz int) (uint, error) { + p, _, err := syscall.Syscall6(SYS_IO_URING_ENTER, uintptr(fd), uintptr(toSubmit), uintptr(minComplete), uintptr(flags), uintptr(sig), uintptr(sz)) + if err != 0 { + return 0, os.NewSyscallError("io_uring_enter", err) + } + return uint(p), nil +} + +// _sizeU32 is size of uint32 +const _sizeU32 = unsafe.Sizeof(uint32(0)) + +// _sizeUR is size of URing +const _sizeUR = unsafe.Sizeof(URing{}) + +// _sizeCQE is size of URingCQE +const _sizeCQE = unsafe.Sizeof(URingCQE{}) + +// _sizeSQE is size of URingSQE +const _sizeSQE = unsafe.Sizeof(URingSQE{}) + +// _sizeEventsArg is size of eventsArg +const _sizeEventsArg = unsafe.Sizeof(eventsArg{}) + +// Init system call numbers +const ( + SYS_IO_URING_SETUP = 425 + SYS_IO_URING_ENTER = 426 + SYS_IO_URING_REGISTER = 427 + + NSIG = 64 +) + +// Flags of uringSQ +const ( + // IORING_SQ_NEED_WAKEUP means needs io_uring_enter wakeup + IORING_SQ_NEED_WAKEUP uint32 = 1 << iota + // IORING_SQ_CQ_OVERFLOW means CQ ring is overflown + IORING_SQ_CQ_OVERFLOW + // IORING_SQ_TASKRUN means task should enter the kernel + IORING_SQ_TASKRUN +) + +// Flags of uringCQ +// IORING_CQ_EVENTFD_DISABLED means disable eventfd notifications +const IORING_CQ_EVENTFD_DISABLED uint32 = 1 << iota + +const INT_FLAG_REG_RING = 1 +const LIBURING_UDATA_TIMEOUT = math.MaxUint64 + +var SMP_SQRING atomic.Value + +func WRITE_ONCE_U32(p *uint32, v uint32) { + atomic.StoreUint32(p, v) +} + +func READ_ONCE_U32(p *uint32) uint32 { + return atomic.LoadUint32(p) +} + +func SMP_STORE_RELEASE_U32(p *uint32, v uint32) { + atomic.StoreUint32(p, v) +} + +func SMP_LOAD_ACQUIRE_U32(p *uint32) uint32 { + return atomic.LoadUint32(p) +} + +func SMP_MEMORY_BARRIER(p **uringSQ) { + atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(p)), SMP_SQRING.Load().(unsafe.Pointer)) +} diff --git a/uring/uring.go b/uring/uring.go new file mode 100644 index 00000000..10c090a5 --- /dev/null +++ b/uring/uring.go @@ -0,0 +1,204 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package uring + +import ( + "runtime" + "syscall" + "time" + "unsafe" +) + +// IOURing create new io_uring instance with Setup Options +func IOURing(entries uint32, ops ...setupOp) (u *URing, err error) { + params := &ringParams{} + for _, op := range ops { + op(params) + } + fd, err := SysSetUp(entries, params) + if err != nil { + return nil, err + } + u = &URing{Params: params, fd: fd, sqRing: &uringSQ{}, cqRing: &uringCQ{}} + err = u.sysMmap(params) + + return +} + +// Fd will return fd of URing +func (u *URing) Fd() int { + return u.fd +} + +// SQE will return a submission queue entry that can be used to submit an I/O operation. +func (u *URing) SQE() *URingSQE { + return u.sqRing.sqes +} + +// Queue add an operation to SQ queue +func (u *URing) Queue(op Op, flags uint8, userData uint64) error { + sqe, err := u.nextSQE() + if err != nil { + return err + } + + op.Prep(sqe) + sqe.setFlags(flags) + sqe.setUserData(userData) + + return nil +} + +// Probe implements URing, it returns io_uring probe +func (u *URing) Probe() (probe *Probe, err error) { + probe = &Probe{} + err = SysRegister(u.fd, IORING_REGISTER_PROBE, unsafe.Pointer(probe), 256) + SMP_SQRING.Store(u.sqRing) + return +} + +// RegisterProbe implements URing +func (u URing) RegisterProbe(p *Probe, nrOps int) error { + err := SysRegister(u.fd, IORING_REGISTER_PROBE, unsafe.Pointer(p), nrOps) + SMP_SQRING.Store(u.sqRing) + return err +} + +// Advance implements URing, it must be called after EachCQE() +func (u *URing) Advance(nr uint32) { + if nr != 0 { + // Ensure that the kernel only sees the new value of the head + // index after the CQEs have been read. + SMP_STORE_RELEASE_U32(u.cqRing.kHead, *u.cqRing.kHead+nr) + } +} + +// Close implements URing +func (u *URing) Close() error { + err := u.sysMunmap() + return err +} + +// ------------------------------------------ implement submission ------------------------------------------ + +// Submit will return the number of SQEs submitted. +func (u *URing) Submit() (uint, error) { + return u.submitAndWait(0) +} + +// SubmitAndWait is the same as Submit(), but takes an additional parameter +// nr that lets you specify how many completions to wait for. +// This call will block until nr submission requests are processed by the kernel +// and their details placed in the CQ. +func (u *URing) SubmitAndWait(nr uint32) (uint, error) { + return u.submitAndWait(nr) +} + +// ------------------------------------------ implement completion ------------------------------------------ + +// WaitCQE implements URing, it returns an I/O CQE, waiting for it if necessary +func (u *URing) WaitCQE() (cqe *URingCQE, err error) { + return u.WaitCQENr(1) +} + +// WaitCQENr implements URing, it returns an I/O CQE, waiting for nr completions if one isn’t readily +func (u *URing) WaitCQENr(nr uint32) (cqe *URingCQE, err error) { + return u.getCQE(getData{ + submit: 0, + waitNr: nr, + getFlags: 0, + arg: unsafe.Pointer(nil), + sz: NSIG / 8, + }) +} + +// WaitCQEs implements URing, like WaitCQE() except it accepts a timeout value as well. +// Note that an SQE is used internally to handle the timeout. Applications using this function +// must never set sqe->user_data to LIBURING_UDATA_TIMEOUT. +func (u *URing) WaitCQEs(nr uint32, timeout time.Duration) (*URingCQE, error) { + var toSubmit int64 + + if timeout > 0 { + if u.Params.flags&IORING_FEAT_EXT_ARG != 0 { + return u.WaitCQEsNew(nr, timeout) + } + toSubmit, err := u.submitTimeout(timeout) + + if toSubmit < 0 { + return nil, err + } + } + + return u.getCQE(getData{ + submit: uint32(toSubmit), + waitNr: nr, + arg: unsafe.Pointer(nil), + sz: NSIG / 8, + }) +} + +// WaitCQETimeout implements URing, returns an I/O completion, +// if one is readily available. Doesn’t wait. +func (u *URing) WaitCQETimeout(timeout time.Duration) (cqe *URingCQE, err error) { + return u.WaitCQEs(1, timeout) +} + +// PeekBatchCQE implements URing, it fills in an array of I/O CQE up to count, +// if they are available, returning the count of completions filled. +// Does not wait for completions. They have to be already available for them to be returned by this function. +func (u *URing) PeekBatchCQE(cqes []*URingCQE) int { + var shift int + if u.Params.flags&IORING_SETUP_CQE32 != 0 { + shift = 1 + } + + n := u.peekBatchCQE(cqes, shift) + + if n == 0 { + if u.cqRingNeedFlush() { + SysEnter(u.fd, 0, 0, IORING_ENTER_GETEVENTS, nil, NSIG/8) + SMP_SQRING.Store(u.sqRing) + n = u.peekBatchCQE(cqes, shift) + } + } + + return n +} + +// CQESeen implements URing, it must be called after PeekCQE() or WaitCQE() +// and after the cqe has been processed by the application. +func (u *URing) CQESeen() { + if u.cqRing.cqes != nil { + u.Advance(1) + } +} + +// WaitCQEsNew implements URing +func (u *URing) WaitCQEsNew(nr uint32, timeout time.Duration) (cqe *URingCQE, err error) { + ts := syscall.NsecToTimespec(timeout.Nanoseconds()) + arg := getEventsArg(uintptr(unsafe.Pointer(nil)), NSIG/8, uintptr(unsafe.Pointer(&ts))) + + cqe, err = u.getCQE(getData{ + submit: 0, + waitNr: nr, + getFlags: IORING_ENTER_EXT_ARG, + arg: unsafe.Pointer(arg), + sz: int(_sizeEventsArg), + }) + + runtime.KeepAlive(arg) + runtime.KeepAlive(ts) + return +} diff --git a/uring/uring_cmplt.go b/uring/uring_cmplt.go new file mode 100644 index 00000000..24fecb75 --- /dev/null +++ b/uring/uring_cmplt.go @@ -0,0 +1,257 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package uring + +import ( + "errors" + "syscall" + "time" + "unsafe" +) + +type getData struct { + submit uint32 + waitNr uint32 + getFlags uint32 + sz int + arg unsafe.Pointer +} + +type eventsArg struct { + sigMask uintptr + sigMaskSz uint32 + _pad uint32 + ts uintptr +} + +// getCQE implements URing +func (u *URing) getCQE(data getData) (cqe *URingCQE, err error) { + var looped = false + for { + var needEnter bool + var flags, nrAvail uint32 + + nrAvail, cqe, err = u.peekCQE() + if err != nil { + break + } + + if cqe == nil && data.waitNr == 0 && data.submit == 0 { + // If we already looped once, we already entererd + // the kernel. Since there's nothing to submit or + // wait for, don't keep retrying. + if looped || !u.cqRingNeedFlush() { + err = syscall.EAGAIN + break + } + needEnter = true + } + + if data.waitNr > nrAvail || needEnter { + flags = IORING_ENTER_GETEVENTS | data.getFlags + needEnter = true + } + + if u.sqRingNeedEnter(data.submit, &flags) { + needEnter = true + } + + if !needEnter { + break + } + + if u.Params.flags&INT_FLAG_REG_RING != 0 { + flags |= IORING_ENTER_REGISTERED_RING + } + + // TODO: TestTimeoutWait not supported + ret, err := SysEnter(u.fd, data.submit, data.waitNr, flags, data.arg, data.sz) + if err != nil { + break + } + SMP_SQRING.Store(u.sqRing) + + data.submit -= uint32(ret) + if cqe != nil { + break + } + looped = true + } + return +} + +// getEventsArg implements URing +func getEventsArg(sigMask uintptr, sigMaskSz uint32, ts uintptr) *eventsArg { + return &eventsArg{sigMask: sigMask, sigMaskSz: sigMaskSz, ts: ts} +} + +// submitTimeout implements URing +func (u *URing) submitTimeout(timeout time.Duration) (int64, error) { + sqe, err := u.nextSQE() + if err != nil { + _, err = u.Submit() + if err != nil { + return -1, err + } + + sqe, err = u.nextSQE() + if err != nil { + return -int64(syscall.EAGAIN), err + } + } + + Timeout(timeout).Prep(sqe) + sqe.setUserData(LIBURING_UDATA_TIMEOUT) + + return int64(u.flushSQ()), nil +} + +// peekCQE implements URing +func (u *URing) peekCQE() (uint32, *URingCQE, error) { + mask := *u.cqRing.kRingMask + var cqe *URingCQE + var avail uint32 + var err error + + var shift = 0 + if u.Params.flags&IORING_SETUP_CQE32 != 0 { + shift = 1 + } + + for { + tail := SMP_LOAD_ACQUIRE_U32(u.cqRing.kTail) + head := SMP_LOAD_ACQUIRE_U32(u.cqRing.kHead) + + cqe = nil + avail = tail - head + if avail == 0 { + break + } + + cqe = (*URingCQE)(unsafe.Add(unsafe.Pointer(u.cqRing.cqes), uintptr((head&mask)< ready { + count = ready + } + last := head + count + for i := 0; head != last; head, i = head+1, i+1 { + cqes[i] = (*URingCQE)(unsafe.Add(unsafe.Pointer(u.cqRing.cqes), uintptr((head&mask)<= next-head { + idx := u.sqRing.sqeTail & *u.sqRing.kRingMask * uint32(_sizeSQE) + sqe = (*URingSQE)(unsafe.Pointer(&u.sqRing.sqeBuff[idx])) + u.sqRing.sqeTail = next + + SMP_SQRING.Store(u.sqRing) + } else { + err = errors.New("sq ring overflow") + } + return +} + +// caRingNeedEnter implements URing +func (u *URing) cqRingNeedEnter() bool { + return u.Params.flags&IORING_SETUP_IOPOLL != 0 || u.cqRingNeedFlush() +} + +// cqRingNeedFlush implements URing +func (u *URing) cqRingNeedFlush() bool { + return READ_ONCE_U32(u.sqRing.kFlags)&(IORING_SQ_CQ_OVERFLOW|IORING_SQ_TASKRUN) != 0 +} + +// sqRingNeedEnter implements URing +func (u *URing) sqRingNeedEnter(submit uint32, flags *uint32) bool { + if submit == 0 { + return false + } + if u.Params.flags&IORING_SETUP_SQPOLL == 0 { + return true + } + /* + * Ensure the kernel can see the store to the SQ tail before we read + * the flags. + */ + SMP_MEMORY_BARRIER(&u.sqRing) + + if READ_ONCE_U32(u.sqRing.kFlags)&IORING_SQ_NEED_WAKEUP != 0 { + *flags |= IORING_ENTER_SQ_WAKEUP + return true + } + return false +} + +// ready implements URing +func (c *uringCQ) ready() uint32 { + return SMP_LOAD_ACQUIRE_U32(c.kTail) - SMP_LOAD_ACQUIRE_U32(c.kHead) +} diff --git a/uring/uring_sbmt.go b/uring/uring_sbmt.go new file mode 100644 index 00000000..5ad57098 --- /dev/null +++ b/uring/uring_sbmt.go @@ -0,0 +1,122 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package uring + +import "unsafe" + +// URing means I/O Userspace Ring +type URing struct { + cqRing *uringCQ + sqRing *uringSQ + + fd int + + Params *ringParams +} + +// uringSQ means Submit Queue +type uringSQ struct { + buff []byte + sqeBuff []byte + + kHead *uint32 + kTail *uint32 + kRingMask *uint32 + kRingEntries *uint32 + kFlags *uint32 + kDropped *uint32 + array *uint32 + sqes *URingSQE + + sqeHead uint32 + sqeTail uint32 + + ringSize uint64 +} + +// uringCQ means Completion Queue +type uringCQ struct { + buff []byte + kFlags uintptr + + kHead *uint32 + kTail *uint32 + kRingMask *uint32 + kRingEntries *uint32 + kOverflow *uint32 + cqes *URingCQE + + ringSize uint64 +} + +// submitAndWait implements URing +func (u *URing) submitAndWait(nr uint32) (uint, error) { + return u.submit(u.flushSQ(), nr, false) +} + +// submit implements URing +func (u *URing) submit(submitted uint32, nr uint32, getEvents bool) (uint, error) { + cqNeedsEnter := getEvents || nr != 0 || u.cqRingNeedEnter() + var flags uint32 + if u.sqRingNeedEnter(submitted, &flags) || cqNeedsEnter { + if cqNeedsEnter { + flags |= IORING_ENTER_GETEVENTS + } + if u.Params.flags&INT_FLAG_REG_RING == 1 { + flags |= IORING_ENTER_REGISTERED_RING + } + } else { + return uint(submitted), nil + } + ret, err := SysEnter(u.fd, submitted, nr, flags, nil, NSIG/8) + SMP_SQRING.Store(u.sqRing) + return ret, err +} + +// flushSQ implements URing +func (u *URing) flushSQ() uint32 { + mask := *u.sqRing.kRingMask + tail := SMP_LOAD_ACQUIRE_U32(u.sqRing.kTail) + toSubmit := u.sqRing.sqeTail - u.sqRing.sqeHead + + if toSubmit == 0 { + return tail - SMP_LOAD_ACQUIRE_U32(u.sqRing.kHead) + } + + for toSubmit > 0 { + *(*uint32)(unsafe.Add(unsafe.Pointer(u.sqRing.array), tail&mask*uint32(_sizeU32))) = u.sqRing.sqeHead & mask + tail++ + u.sqRing.sqeHead++ + toSubmit-- + } + + /* + * Ensure kernel sees the SQE updates before the tail update. + */ + SMP_STORE_RELEASE_U32(u.sqRing.kTail, tail) + + /* + * This _may_ look problematic, as we're not supposed to be reading + * SQ->head without acquire semantics. When we're in SQPOLL mode, the + * kernel submitter could be updating this right now. For non-SQPOLL, + * task itself does it, and there's no potential race. But even for + * SQPOLL, the load is going to be potentially out-of-date the very + * instant it's done, regardless or whether or not it's done + * atomically. Worst case, we're going to be over-estimating what + * we can submit. The point is, we need to be able to deal with this + * situation regardless of any perceived atomicity. + */ + return tail - SMP_LOAD_ACQUIRE_U32(u.sqRing.kHead) +} diff --git a/uring/uring_test.go b/uring/uring_test.go new file mode 100644 index 00000000..cfba9959 --- /dev/null +++ b/uring/uring_test.go @@ -0,0 +1,272 @@ +// Copyright 2022 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package uring + +import ( + "errors" + "io/ioutil" + "os" + "syscall" + "testing" + + "golang.org/x/sys/unix" +) + +const openFile = "./../go.mod" + +func MustNil(t *testing.T, val interface{}) { + t.Helper() + Assert(t, val == nil, val) + if val != nil { + t.Fatal("assertion nil failed, val=", val) + } +} + +func MustTrue(t *testing.T, cond bool) { + t.Helper() + if !cond { + t.Fatal("assertion true failed.") + } +} + +func Equal(t *testing.T, got, expect interface{}) { + t.Helper() + if got != expect { + t.Fatalf("assertion equal failed, got=[%v], expect=[%v]", got, expect) + } +} + +func Assert(t *testing.T, cond bool, val ...interface{}) { + t.Helper() + if !cond { + if len(val) > 0 { + val = append([]interface{}{"assertion failed:"}, val...) + t.Fatal(val...) + } else { + t.Fatal("assertion failed") + } + } +} + +func TestClose(t *testing.T) { + u, err := IOURing(8) + MustNil(t, err) + Assert(t, u.Fd() != 0) + defer u.Close() + + f, err := os.Open(openFile) + MustNil(t, err) + defer f.Close() + + err = u.Queue(Close(f.Fd()), 0, 0) + MustNil(t, err) + + _, err = u.Submit() + MustNil(t, err) + + cqe, err := u.WaitCQE() + MustNil(t, err) + MustNil(t, cqe.Error()) + + _, err = unix.FcntlInt(f.Fd(), unix.F_GETFD, 0) + Equal(t, err, unix.EBADF) +} + +func TestReadV(t *testing.T) { + u, err := IOURing(8) + MustNil(t, err) + defer u.Close() + + f, err := os.Open(openFile) + MustNil(t, err) + defer f.Close() + + v, err := makeV(f, 16) + MustNil(t, err) + + err = u.Queue(ReadV(f.Fd(), v, 0), 0, 0) + MustNil(t, err) + + _, err = u.Submit() + MustNil(t, err) + + cqe, err := u.WaitCQE() + MustNil(t, err) + MustNil(t, cqe.Error()) + + expected, err := ioutil.ReadFile(openFile) + MustNil(t, err) + Assert(t, vToString(v) == string(expected)) +} + +func TestReady(t *testing.T) { + u, err := IOURing(8) + MustNil(t, err) + defer u.Close() + + Equal(t, u.cqRing.ready(), uint32(0)) + + err = queueSQEs(u, 5, 0) + Equal(t, u.cqRing.ready(), uint32(5)) + + u.CQESeen() + Equal(t, u.cqRing.ready(), uint32(4)) + + u.Advance(4) + Equal(t, u.cqRing.ready(), uint32(0)) +} + +/* TODO: TestTimeoutWait not supported + * +func TestTimeoutWait(t *testing.T) { + u, err := IOURing(8) + MustNil(t, err) + defer u.Close() + + err = u.Queue(Nop(), 0, 1) + MustNil(t, err) + + if u.Params.features&IORING_FEAT_EXT_ARG != 0 { + n, err := u.Submit() + MustNil(t, err) + Equal(t, int(n), 1) + } + + n := 0 + for { + cqe, err := u.WaitCQETimeout(time.Second) + if errors.Is(err, syscall.ETIME) { + break + } + if errors.Is(err, syscall.EINTR) || errors.Is(err, syscall.EAGAIN) { + runtime.Gosched() + continue + } + + MustNil(t, err) + u.CQESeen() + + MustNil(t, cqe.Error()) + n++ + } + Equal(t, n, 1) +} +*/ + +func TestPeekCQE(t *testing.T) { + u, err := IOURing(8) + MustNil(t, err) + defer u.Close() + + cqeBuff := make([]*URingCQE, 128) + + n := u.PeekBatchCQE(cqeBuff) + Equal(t, n, 0) + + err = queueSQEs(u, 4, 0) + MustNil(t, err) + + n = u.PeekBatchCQE(cqeBuff) + Equal(t, n, 4) + + for i := 0; i < 4; i++ { + Equal(t, cqeBuff[i].UserData, uint64(i)) + } + + err = queueSQEs(u, 4, 4) + MustNil(t, err) + + u.Advance(4) + n = u.PeekBatchCQE(cqeBuff) + Equal(t, n, 4) + + for i := 0; i < 4; i++ { + Equal(t, cqeBuff[i].UserData, uint64(i+4)) + } + + u.Advance(4) + n = u.PeekBatchCQE(cqeBuff) + Equal(t, n, 0) +} + +func TestProbe(t *testing.T) { + u, err := IOURing(8) + MustNil(t, err) + defer u.Close() + + probe, err := u.Probe() + if errors.Is(err, syscall.EINVAL) { + t.Skip("IORING_REGISTER_PROBE not supported") + } + MustNil(t, err) + + Assert(t, probe.lastOp != 0) +} + +func TestCQSize(t *testing.T) { + u, err := IOURing(8, CQSize(64)) + MustNil(t, err) + Equal(t, u.Params.cqEntries, uint32(64)) + + err = u.Close() + MustNil(t, err) + + _, err = IOURing(4, CQSize(0)) + Assert(t, err != nil) +} + +func makeV(f *os.File, vSZ int64) ([][]byte, error) { + stat, err := f.Stat() + if err != nil { + return nil, err + } + + bytes := stat.Size() + blocks := int(bytes / vSZ) + if bytes%vSZ != 0 { + blocks++ + } + + buffs := make([][]byte, 0, blocks) + for bytes != 0 { + bytesToRead := bytes + if bytesToRead > vSZ { + bytesToRead = vSZ + } + + buffs = append(buffs, make([]byte, bytesToRead)) + bytes -= bytesToRead + } + + return buffs, nil +} + +func vToString(v [][]byte) (str string) { + for _, vector := range v { + str += string(vector) + } + return +} + +func queueSQEs(u *URing, count, offset int) (err error) { + for i := 0; i < count; i++ { + err = u.Queue(Nop(), 0, uint64(i+offset)) + if err != nil { + return + } + } + _, err = u.Submit() + return +}