Skip to content

Commit

Permalink
fix: protect operator detach twice (#283)
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Nov 28, 2023
1 parent 6d9efd0 commit 376fa78
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 19 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
name: Push and Pull Request Check

on: [ push ]
on: [ push, pull_request ]

jobs:
compatibility-test:
strategy:
matrix:
go: [ 1.15, "1.20" ]
go: [ 1.15, "1.21" ]
os: [ X64, ARM64 ]
runs-on: ${{ matrix.os }}
steps:
Expand Down
23 changes: 18 additions & 5 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,26 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
if isProcessable(c) {
process(c)
}
for !c.isCloseBy(user) && isProcessable(c) {
// `process` must either eventually read all the input data or actively Close the connection,
// otherwise the goroutine will fall into a dead loop.
var closedBy who
for {
closedBy = c.status(closing)
// close by user or no processable
if closedBy == user || !isProcessable(c) {
break
}
process(c)
}
// Handling callback if connection has been closed.
if !c.IsActive() {
// connection if closed by user when processing, so it needs detach
c.closeCallback(false, true)
if closedBy != none {
// if closed by user when processing, it "may" needs detach
needDetach := closedBy == user
// Here is a conor case that operator will be detached twice:
// If server closed the connection(client OnHup will detach op first and closeBy=poller),
// and then client's OnRequest function also closed the connection(closeBy=user).
// But operator already prevent that detach twice will not cause any problem
c.closeCallback(false, needDetach)
panicked = false
return
}
Expand Down Expand Up @@ -229,7 +242,7 @@ func (c *connection) closeCallback(needLock bool, needDetach bool) (err error) {
if needDetach && c.operator.poll != nil { // If Close is called during OnPrepare, poll is not registered.
// PollDetach only happen when user call conn.Close() or poller detect error
if err := c.operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: onClose detach operator failed: %v", err)
logger.Printf("NETPOLL: closeCallback[%v,%v] detach operator failed: %v", needLock, needDetach, err)
}
}
var latest = c.closeCallbacks.Load()
Expand Down
9 changes: 5 additions & 4 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ func (c *connection) onHup(p Poll) error {
c.triggerRead(Exception(ErrEOF, "peer close"))
c.triggerWrite(Exception(ErrConnClosed, "peer close"))
// It depends on closing by user if OnConnect and OnRequest is nil, otherwise it needs to be released actively.
// It can be confirmed that the OnRequest goroutine has been exited before closecallback executing,
// It can be confirmed that the OnRequest goroutine has been exited before closeCallback executing,
// and it is safe to close the buffer at this time.
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
if onConnect != nil || onRequest != nil {
var onConnect = c.onConnectCallback.Load()
var onRequest = c.onRequestCallback.Load()
var needCloseByUser = onConnect == nil && onRequest == nil
if !needCloseByUser {
// already PollDetach when call OnHup
c.closeCallback(true, false)
}
Expand Down
98 changes: 98 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,3 +540,101 @@ func TestParallelShortConnection(t *testing.T) {
time.Sleep(time.Millisecond * 100)
}
}

func TestConnectionServerClose(t *testing.T) {
ln, err := createTestListener("tcp", ":12345")
MustNil(t, err)
defer ln.Close()

/*
Client Server
- Client --- connect --> Server
- Client <-- [ping] --- Server
- Client --- [pong] --> Server
- Client <-- close --- Server
- Client --- close --> Server
*/
const PING, PONG = "ping", "pong"
var wg sync.WaitGroup
el, err := NewEventLoop(
func(ctx context.Context, connection Connection) error {
t.Logf("server.OnRequest: addr=%s", connection.RemoteAddr())
defer wg.Done()
buf, err := connection.Reader().Next(len(PONG)) // pong
Equal(t, string(buf), PONG)
MustNil(t, err)
err = connection.Reader().Release()
MustNil(t, err)
err = connection.Close()
MustNil(t, err)
return err
},
WithOnConnect(func(ctx context.Context, connection Connection) context.Context {
t.Logf("server.OnConnect: addr=%s", connection.RemoteAddr())
defer wg.Done()
// check OnPrepare
v := ctx.Value("prepare").(string)
Equal(t, v, "true")

_, err := connection.Writer().WriteBinary([]byte(PING))
MustNil(t, err)
err = connection.Writer().Flush()
MustNil(t, err)
connection.AddCloseCallback(func(connection Connection) error {
t.Logf("server.CloseCallback: addr=%s", connection.RemoteAddr())
wg.Done()
return nil
})
return ctx
}),
WithOnPrepare(func(connection Connection) context.Context {
t.Logf("server.OnPrepare: addr=%s", connection.RemoteAddr())
defer wg.Done()
return context.WithValue(context.Background(), "prepare", "true")
}),
)
defer el.Shutdown(context.Background())
go func() {
err := el.Serve(ln)
if err != nil {
t.Logf("servce end with error: %v", err)
}
}()

var clientOnRequest OnRequest = func(ctx context.Context, connection Connection) error {
t.Logf("client.OnRequest: addr=%s", connection.LocalAddr())
defer wg.Done()
buf, err := connection.Reader().Next(len(PING))
MustNil(t, err)
Equal(t, string(buf), PING)

_, err = connection.Writer().WriteBinary([]byte(PONG))
MustNil(t, err)
err = connection.Writer().Flush()
MustNil(t, err)

_, err = connection.Reader().Next(1) // server will not send any data, just wait for server close
MustTrue(t, errors.Is(err, ErrEOF)) // should get EOF when server close

return connection.Close()
}
conns := 100
// server: OnPrepare, OnConnect, OnRequest, CloseCallback
// client: OnRequest, CloseCallback
wg.Add(conns * 6)
for i := 0; i < conns; i++ {
go func() {
conn, err := DialConnection("tcp", ":12345", time.Second)
MustNil(t, err)
err = conn.SetOnRequest(clientOnRequest)
MustNil(t, err)
conn.AddCloseCallback(func(connection Connection) error {
t.Logf("client.CloseCallback: addr=%s", connection.LocalAddr())
defer wg.Done()
return nil
})
}()
}
//time.Sleep(time.Second)
wg.Wait()
}
7 changes: 7 additions & 0 deletions fd_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,19 @@ type FDOperator struct {
// poll is the registered location of the file descriptor.
poll Poll

// protect only detach once
detached int32

// private, used by operatorCache
next *FDOperator
state int32 // CAS: 0(unused) 1(inuse) 2(do-done)
index int32 // index in operatorCache
}

func (op *FDOperator) Control(event PollEvent) error {
if event == PollDetach && atomic.AddInt32(&op.detached, 1) > 1 {
return nil
}
return op.poll.Control(op, event)
}

Expand Down Expand Up @@ -92,4 +98,5 @@ func (op *FDOperator) reset() {
op.Inputs, op.InputAck = nil, nil
op.Outputs, op.OutputAck = nil, nil
op.poll = nil
op.detached = 0
}
8 changes: 4 additions & 4 deletions nocopy_linkbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,8 @@ func (b *LinkBuffer) WriteBinary(p []byte) (n int, err error) {
}
// here will copy
b.growth(n)
malloc := b.write.malloc
b.write.malloc += n
return copy(b.write.buf[malloc:b.write.malloc], p), nil
buf := b.write.Malloc(n)
return copy(buf, p), nil
}

// WriteDirect cannot be mixed with WriteString or WriteBinary functions.
Expand Down Expand Up @@ -578,7 +577,8 @@ func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte) {
//
// bookSize: The size of data that can be read at once.
// maxSize: The maximum size of data between two Release(). In some cases, this can
// guarantee all data allocated in one node to reduce copy.
//
// guarantee all data allocated in one node to reduce copy.
func (b *LinkBuffer) book(bookSize, maxSize int) (p []byte) {
l := cap(b.write.buf) - b.write.malloc
// grow linkBuffer
Expand Down
8 changes: 4 additions & 4 deletions nocopy_linkbuffer_race.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,8 @@ func (b *LinkBuffer) WriteBinary(p []byte) (n int, err error) {
}
// here will copy
b.growth(n)
malloc := b.write.malloc
b.write.malloc += n
return copy(b.write.buf[malloc:b.write.malloc], p), nil
buf := b.write.Malloc(n)
return copy(buf, p), nil
}

// WriteDirect cannot be mixed with WriteString or WriteBinary functions.
Expand Down Expand Up @@ -622,7 +621,8 @@ func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte) {
//
// bookSize: The size of data that can be read at once.
// maxSize: The maximum size of data between two Release(). In some cases, this can
// guarantee all data allocated in one node to reduce copy.
//
// guarantee all data allocated in one node to reduce copy.
func (b *LinkBuffer) book(bookSize, maxSize int) (p []byte) {
b.Lock()
defer b.Unlock()
Expand Down

0 comments on commit 376fa78

Please sign in to comment.