From 73edf4b262fba5203f05ac216f68d4454dd70fbc Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Tue, 28 Nov 2023 14:29:18 +0800 Subject: [PATCH] fix: barrier mem leak --- connection_onevent.go | 1 + connection_reactor.go | 1 + netpoll_server.go | 37 ++++++++++++++++++++++++++----------- sys_exec.go | 10 ++++++++++ 4 files changed, 38 insertions(+), 11 deletions(-) diff --git a/connection_onevent.go b/connection_onevent.go index 6f055f37..39d4ef58 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -256,6 +256,7 @@ func (c *connection) closeCallback(needLock bool, needDetach bool) (err error) { for callback := latest.(*callbackNode); callback != nil; callback = callback.pre { callback.fn(c) } + c.ctx = nil return nil } diff --git a/connection_reactor.go b/connection_reactor.go index cd5d717c..619c7a15 100644 --- a/connection_reactor.go +++ b/connection_reactor.go @@ -74,6 +74,7 @@ func (c *connection) closeBuffer() { } if c.outputBuffer.Len() == 0 || onConnect != nil || onRequest != nil { c.outputBuffer.Close() + c.outputBarrier.reset() barrierPool.Put(c.outputBarrier) } } diff --git a/netpoll_server.go b/netpoll_server.go index 2d6c5709..2267184a 100644 --- a/netpoll_server.go +++ b/netpoll_server.go @@ -20,8 +20,11 @@ package netpoll import ( "context" "errors" + "log" + "runtime" "strings" "sync" + "sync/atomic" "time" ) @@ -35,11 +38,12 @@ func newServer(ln Listener, opts *options, onQuit func(err error)) *server { } type server struct { - operator FDOperator - ln Listener - opts *options - onQuit func(err error) - connections sync.Map // key=fd, value=connection + operator FDOperator + ln Listener + opts *options + onQuit func(err error) + connections sync.Map // key=fd, value=connection + connectionsNum int64 } // Run this server. @@ -106,20 +110,31 @@ func (s *server) OnRead(p Poll) error { return nil } // store & register connection - var connection = &connection{} - connection.init(conn.(Conn), s.opts) - if !connection.IsActive() { + var nconn = &connection{} + runtime.SetFinalizer(nconn, func(cc *connection) { + log.Printf("netpoll finalized connection: %v", cc) + }) + nconn.init(conn.(Conn), s.opts) + if !nconn.IsActive() { return nil } var fd = conn.(Conn).Fd() - connection.AddCloseCallback(func(connection Connection) error { + nconn.AddCloseCallback(func(nconn Connection) error { s.connections.Delete(fd) + atomic.AddInt64(&s.connectionsNum, -1) + totalConn := 0 + s.connections.Range(func(key, value interface{}) bool { + totalConn++ + return true + }) + log.Printf("netpoll close connection: fd=%d, totalConn=%d", fd, totalConn) return nil }) - s.connections.Store(fd, connection) + s.connections.Store(fd, nconn) + atomic.AddInt64(&s.connectionsNum, 1) // trigger onConnect asynchronously - connection.onConnect() + nconn.onConnect() return nil } diff --git a/sys_exec.go b/sys_exec.go index 1c8e40e4..965aa968 100644 --- a/sys_exec.go +++ b/sys_exec.go @@ -62,6 +62,16 @@ type barrier struct { ivs []syscall.Iovec } +func (b *barrier) reset() { + for i := range b.bs { + b.bs[i] = nil + } + for i := range b.ivs { + b.ivs[i].Base = nil + b.ivs[i].Len = 0 + } +} + // writev wraps the writev system call. func writev(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) { iovLen := iovecs(bs, ivs)