Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: release v0.6.0 #316

Merged
merged 13 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v2
uses: actions/setup-go@v5
with:
go-version: "1.20"

Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ jobs:
os: [ X64, ARM64 ]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go }}
# - uses: actions/cache@v2
Expand All @@ -28,9 +28,9 @@ jobs:
windows-test:
runs-on: windows-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v5
with:
go-version: "1.20"
# - uses: actions/cache@v2
Expand All @@ -44,9 +44,9 @@ jobs:
style-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v5
with:
go-version: 1.16
- name: Check License Header
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: Check Source Branch
run: python2 -c "exit(0 if '${{ github.head_ref }}'.startswith('release') or '${{ github.head_ref }}'.startswith('hotfix') else 1)"
Expand Down
2 changes: 1 addition & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"time"
)

// CloseCallback will be called when the connection is closed.
// CloseCallback will be called after the connection is closed.
// Return: error is unused which will be ignored directly.
type CloseCallback func(connection Connection) error

Expand Down
20 changes: 20 additions & 0 deletions connection_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package netpoll

import (
"fmt"
"net"
"syscall"
)

Expand Down Expand Up @@ -51,6 +52,10 @@ func Exception(err error, suffix string) error {
return &exception{no: no, suffix: suffix}
}

var (
_ net.Error = (*exception)(nil)
)

type exception struct {
no syscall.Errno
suffix string
Expand Down Expand Up @@ -88,6 +93,21 @@ func (e *exception) Unwrap() error {
return e.no
}

func (e *exception) Timeout() bool {
switch e.no {
case ErrDialTimeout, ErrReadTimeout, ErrWriteTimeout:
return true
}
if e.no.Timeout() {
return true
}
return false
}

func (e *exception) Temporary() bool {
return e.no.Temporary()
}

// Errors defined in netpoll
var errnos = [...]string{
ErrnoMask & ErrConnClosed: "connection has been closed",
Expand Down
8 changes: 5 additions & 3 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ type connection struct {
outputBuffer *LinkBuffer
outputBarrier *barrier
supportZeroCopy bool
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
state int32 // 0: not connected, 1: connected, 2: disconnected. Connection state should be changed sequentially.
}

var (
Expand Down Expand Up @@ -323,6 +324,7 @@ func (c *connection) init(conn Conn, opts *options) (err error) {
c.bookSize, c.maxSize = pagesize, pagesize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
c.outputBarrier = barrierPool.Get().(*barrier)
c.state = 0

c.initNetFD(conn) // conn must be *netFD{}
c.initFDOperator()
Expand Down Expand Up @@ -447,7 +449,7 @@ func (c *connection) waitReadWithTimeout(n int) (err error) {
return Exception(ErrReadTimeout, c.remoteAddr.String())
case err = <-c.readTrigger:
if err != nil {
return err
goto RET
}
continue
}
Expand Down
1 change: 1 addition & 0 deletions connection_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type key int32

const (
closing key = iota
connecting
processing
flushing
// total must be at the bottom.
Expand Down
88 changes: 78 additions & 10 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (

var runTask = gopool.CtxGo

func setRunner(runner func(ctx context.Context, f func())) {
runTask = runner
}

func disableGopool() error {
runTask = func(ctx context.Context, f func()) {
go f()
Expand All @@ -44,10 +48,11 @@ type gracefulExit interface {
// OnPrepare, OnRequest, CloseCallback share the lock processing,
// which is a CAS lock and can only be cleared by OnRequest.
type onEvent struct {
ctx context.Context
onConnectCallback atomic.Value
onRequestCallback atomic.Value
closeCallbacks atomic.Value // value is latest *callbackNode
ctx context.Context
onConnectCallback atomic.Value
onDisconnectCallback atomic.Value
onRequestCallback atomic.Value
closeCallbacks atomic.Value // value is latest *callbackNode
}

type callbackNode struct {
Expand All @@ -63,6 +68,14 @@ func (c *connection) SetOnConnect(onConnect OnConnect) error {
return nil
}

// SetOnDisconnect set the OnDisconnect callback.
func (c *connection) SetOnDisconnect(onDisconnect OnDisconnect) error {
if onDisconnect != nil {
c.onDisconnectCallback.Store(onDisconnect)
}
return nil
}

// SetOnRequest initialize ctx when setting OnRequest.
func (c *connection) SetOnRequest(onRequest OnRequest) error {
if onRequest == nil {
Expand Down Expand Up @@ -95,6 +108,7 @@ func (c *connection) AddCloseCallback(callback CloseCallback) error {
func (c *connection) onPrepare(opts *options) (err error) {
if opts != nil {
c.SetOnConnect(opts.onConnect)
c.SetOnDisconnect(opts.onDisconnect)
c.SetOnRequest(opts.onRequest)
c.SetReadTimeout(opts.readTimeout)
c.SetWriteTimeout(opts.writeTimeout)
Expand All @@ -120,23 +134,36 @@ func (c *connection) onPrepare(opts *options) (err error) {
func (c *connection) onConnect() {
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
if onConnect == nil {
atomic.StoreInt32(&c.state, 1)
return
}
if !c.lock(connecting) {
// it never happens because onDisconnect will not lock connecting if c.connected == 0
return
}
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
var connected int32
c.onProcess(
// only process when conn active and have unread data
func(c *connection) bool {
// if onConnect not called
if atomic.LoadInt32(&connected) == 0 {
if atomic.LoadInt32(&c.state) == 0 {
return true
}
// check for onRequest
return onRequest != nil && c.Reader().Len() > 0
},
func(c *connection) {
if atomic.CompareAndSwapInt32(&connected, 0, 1) {
if atomic.CompareAndSwapInt32(&c.state, 0, 1) {
c.ctx = onConnect(c.ctx, c)

if !c.IsActive() && atomic.CompareAndSwapInt32(&c.state, 1, 2) {
// since we hold connecting lock, so we should help to call onDisconnect here
var onDisconnect, _ = c.onDisconnectCallback.Load().(OnDisconnect)
if onDisconnect != nil {
onDisconnect(c.ctx, c)
}
}
c.unlock(connecting)
return
}
if onRequest != nil {
Expand All @@ -146,12 +173,44 @@ func (c *connection) onConnect() {
)
}

// when onDisconnect called, c.IsActive() must return false
func (c *connection) onDisconnect() {
var onDisconnect, _ = c.onDisconnectCallback.Load().(OnDisconnect)
if onDisconnect == nil {
return
}
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
if onConnect == nil {
// no need lock if onConnect is nil
atomic.StoreInt32(&c.state, 2)
onDisconnect(c.ctx, c)
return
}
// check if OnConnect finished when onConnect != nil && onDisconnect != nil
if atomic.LoadInt32(&c.state) > 0 && c.lock(connecting) { // means OnConnect already finished
// protect onDisconnect run once
// if CAS return false, means OnConnect already helps to run onDisconnect
if atomic.CompareAndSwapInt32(&c.state, 1, 2) {
onDisconnect(c.ctx, c)
}
c.unlock(connecting)
return
}
// OnConnect is not finished yet, return and let onConnect helps to call onDisconnect
return
}

// onRequest is responsible for executing the closeCallbacks after the connection has been closed.
func (c *connection) onRequest() (needTrigger bool) {
var onRequest, ok = c.onRequestCallback.Load().(OnRequest)
if !ok {
return true
}
// wait onConnect finished first
if atomic.LoadInt32(&c.state) == 0 && c.onConnectCallback.Load() != nil {
// let onConnect to call onRequest
return
}
processed := c.onProcess(
// only process when conn active and have unread data
func(c *connection) bool {
Expand Down Expand Up @@ -206,7 +265,7 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
}
process(c)
}
// Handling callback if connection has been closed.
// handling callback if connection has been closed.
if closedBy != none {
// if closed by user when processing, it "may" needs detach
needDetach := closedBy == user
Expand All @@ -219,15 +278,24 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
return
}
c.unlock(processing)
// Double check when exiting.
// Note: Poller's closeCallback call will try to get processing lock failed but here already neer to unlock processing.
// So here we need to check connection state again, to avoid connection leak
// double check close state
if c.status(closing) != 0 && c.lock(processing) {
// poller will get the processing lock failed, here help poller do closeCallback
// fd must already detach by poller
c.closeCallback(false, false)
panicked = false
return
}
// double check isProcessable
if isProcessable(c) && c.lock(processing) {
goto START
}
// task exits
panicked = false
return
}

runTask(c.ctx, task)
return true
}
Expand Down
4 changes: 4 additions & 0 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func (c *connection) onHup(p Poll) error {
}
c.triggerRead(Exception(ErrEOF, "peer close"))
c.triggerWrite(Exception(ErrConnClosed, "peer close"))

// call Disconnect callback first
c.onDisconnect()

// It depends on closing by user if OnConnect and OnRequest is nil, otherwise it needs to be released actively.
// It can be confirmed that the OnRequest goroutine has been exited before closeCallback executing,
// and it is safe to close the buffer at this time.
Expand Down
7 changes: 5 additions & 2 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,10 @@ func TestParallelShortConnection(t *testing.T) {
var received int64
el, err := NewEventLoop(func(ctx context.Context, connection Connection) error {
data, err := connection.Reader().Next(connection.Reader().Len())
atomic.AddInt64(&received, int64(len(data)))
if err != nil {
return err
}
atomic.AddInt64(&received, int64(len(data)))
//t.Logf("conn[%s] received: %d, active: %v", connection.RemoteAddr(), len(data), connection.IsActive())
return nil
})
Expand Down Expand Up @@ -536,10 +536,13 @@ func TestParallelShortConnection(t *testing.T) {
}
wg.Wait()

for atomic.LoadInt64(&received) < int64(totalSize) {
count := 100
for count > 0 && atomic.LoadInt64(&received) < int64(totalSize) {
t.Logf("received: %d, except: %d", atomic.LoadInt64(&received), totalSize)
time.Sleep(time.Millisecond * 100)
count--
}
Equal(t, atomic.LoadInt64(&received), int64(totalSize))
}

func TestConnectionServerClose(t *testing.T) {
Expand Down
19 changes: 19 additions & 0 deletions eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ type EventLoop interface {
Shutdown(ctx context.Context) error
}

/* The Connection Callback Sequence Diagram
| Connection State | Callback Function | Notes
| Connected but not initialized | OnPrepare | Conn is not registered into poller
| Connected and initialized | OnConnect | Conn is ready for read or write
| Read first byte | OnRequest | Conn is ready for read or write
| Peer closed but conn is active | OnDisconnect | Conn access will race with OnRequest function
| Self closed and conn is closed | CloseCallback | Conn is destroyed

Execution Order:
OnPrepare => OnConnect => OnRequest => CloseCallback
OnDisconnect
Note: only OnRequest and OnDisconnect will be executed in parallel
*/

// OnPrepare is used to inject custom preparation at connection initialization,
// which is optional but important in some scenarios. For example, a qps limiter
// can be set by closing overloaded connections directly in OnPrepare.
Expand Down Expand Up @@ -63,6 +77,11 @@ type OnPrepare func(connection Connection) context.Context
// }
type OnConnect func(ctx context.Context, connection Connection) context.Context

// OnDisconnect is called once connection is going to be closed.
// OnDisconnect must return as quick as possible because it will block poller.
// OnDisconnect is different from CloseCallback, you could check with "The Connection Callback Sequence Diagram" section.
type OnDisconnect func(ctx context.Context, connection Connection)

// OnRequest defines the function for handling connection. When data is sent from the connection peer,
// netpoll actively reads the data in LT mode and places it in the connection's input buffer.
// Generally, OnRequest starts handling the data in the following way:
Expand Down
Loading
Loading