From a1c0872e2b931612dba67e31c787e825ad861e01 Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Thu, 22 Feb 2024 11:23:33 +0800 Subject: [PATCH] fix: discard connections if out of fd --- net_listener.go | 10 ++++- netpoll_server.go | 107 +++++++++++++++++++++++++++++++++----------- netpoll_test.go | 86 +++++++++++++++++++++++++++++++++++ poll_default_bsd.go | 1 + test_conns.sh | 13 ++++++ 5 files changed, 190 insertions(+), 27 deletions(-) create mode 100755 test_conns.sh diff --git a/net_listener.go b/net_listener.go index 4bebcac6..e7f9edc1 100644 --- a/net_listener.go +++ b/net_listener.go @@ -91,7 +91,15 @@ func (ln *listener) Accept() (net.Conn, error) { // tcp var fd, sa, err = syscall.Accept(ln.fd) if err != nil { - if err == syscall.EAGAIN { + /* https://man7.org/linux/man-pages/man2/accept.2.html + EAGAIN or EWOULDBLOCK + The socket is marked nonblocking and no connections are + present to be accepted. POSIX.1-2001 and POSIX.1-2008 + allow either error to be returned for this case, and do + not require these constants to have the same value, so a + portable application should check for both possibilities. + */ + if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK { return nil, nil } return nil, err diff --git a/netpoll_server.go b/netpoll_server.go index 2d6c5709..a85fdc7c 100644 --- a/netpoll_server.go +++ b/netpoll_server.go @@ -20,17 +20,21 @@ package netpoll import ( "context" "errors" + "os" "strings" "sync" + "syscall" "time" ) // newServer wrap listener into server, quit will be invoked when server exit. func newServer(ln Listener, opts *options, onQuit func(err error)) *server { + f, _ := os.Open("/dev/null") return &server{ - ln: ln, - opts: opts, - onQuit: onQuit, + ln: ln, + opts: opts, + onQuit: onQuit, + urgentfd: f, } } @@ -40,6 +44,7 @@ type server struct { opts *options onQuit func(err error) connections sync.Map // key=fd, value=connection + urgentfd *os.File } // Run this server. @@ -92,39 +97,89 @@ func (s *server) Close(ctx context.Context) error { func (s *server) OnRead(p Poll) error { // accept socket conn, err := s.ln.Accept() - if err != nil { - // shut down - if strings.Contains(err.Error(), "closed") { - s.operator.Control(PollDetach) - s.onQuit(err) - return err + if err == nil { + if conn != nil { + s.onAccept(conn.(Conn)) } - logger.Println("NETPOLL: accept conn failed:", err.Error()) - return err - } - if conn == nil { + // EAGAIN | EWOULDBLOCK if conn and err both nil return nil } + logger.Println("NETPOLL: accept conn failed:", err.Error(), isOutOfFdErr(err)) + + // delay accept when too many open files + if false && isOutOfFdErr(err) { + // since we use Epoll LT, we have to detach listener fd from epoll first + // and re-register it when accept successfully or there is no available connection + cerr := s.operator.Control(PollDetach) + if cerr != nil { + logger.Println("NETPOLL: detach listener fd failed:", cerr.Error()) + } + go func() { + println("create") + defer println("finish") + retryTimes := []time.Duration{0, 10, 50, 100, 200, 500, 1000} // ms + retryTimeIndex := 0 + for { + if retryTimeIndex > 0 { + time.Sleep(retryTimes[retryTimeIndex] * time.Millisecond) + } + conn, err := s.ln.Accept() + println(conn, err) + if err == nil { + if conn == nil { + // recovery accept poll loop + s.operator.Control(PollReadable) + println("err and conn == nil") + return + } + s.onAccept(conn.(Conn)) + logger.Println("NETPOLL: re-accept conn success:", conn.RemoteAddr()) + retryTimeIndex = 0 + continue + } + if retryTimeIndex+1 < len(retryTimes) { + retryTimeIndex++ + } + logger.Printf("NETPOLL: re-accept conn failed, err=[%s] and next retrytime=%dms", err.Error(), retryTimes[retryTimeIndex]) + } + }() + } + + // shut down + if strings.Contains(err.Error(), "closed") { + s.operator.Control(PollDetach) + s.onQuit(err) + return err + } + + return err +} + +// OnHup implements FDOperator. +func (s *server) OnHup(p Poll) error { + s.onQuit(errors.New("listener close")) + return nil +} + +func (s *server) onAccept(conn Conn) { // store & register connection - var connection = &connection{} - connection.init(conn.(Conn), s.opts) - if !connection.IsActive() { - return nil + var nconn = new(connection) + nconn.init(conn, s.opts) + if !nconn.IsActive() { + return } - var fd = conn.(Conn).Fd() - connection.AddCloseCallback(func(connection Connection) error { + var fd = conn.Fd() + nconn.AddCloseCallback(func(connection Connection) error { s.connections.Delete(fd) return nil }) - s.connections.Store(fd, connection) + s.connections.Store(fd, nconn) // trigger onConnect asynchronously - connection.onConnect() - return nil + nconn.onConnect() } -// OnHup implements FDOperator. -func (s *server) OnHup(p Poll) error { - s.onQuit(errors.New("listener close")) - return nil +func isOutOfFdErr(err error) bool { + se, ok := err.(syscall.Errno) + return ok && (se == syscall.EMFILE || se == syscall.ENFILE) } diff --git a/netpoll_test.go b/netpoll_test.go index fb985604..b11c950d 100644 --- a/netpoll_test.go +++ b/netpoll_test.go @@ -18,12 +18,17 @@ package netpoll import ( + "bufio" "context" "errors" + "fmt" "math/rand" + "os/exec" "runtime" + "strconv" "sync" "sync/atomic" + "syscall" "testing" "time" ) @@ -507,6 +512,62 @@ func TestClientWriteAndClose(t *testing.T) { MustNil(t, err) } +func TestServerAcceptWhenTooManyOpenFiles(t *testing.T) { + //t.Skip("Only test for debug purpose") + + var originalRlimit syscall.Rlimit + err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &originalRlimit) + MustNil(t, err) + t.Logf("Original RLimit: %v", originalRlimit) + + var connections = 100 + rlimit := syscall.Rlimit{Cur: uint64(connections), Max: originalRlimit.Max} + err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rlimit) + MustNil(t, err) + err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlimit) + MustNil(t, err) + t.Logf("New RLimit: %v", rlimit) + defer func() { // reset + err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &originalRlimit) + MustNil(t, err) + }() + + var network, address = "tcp", ":18888" + var connected int32 + var loop = newTestEventLoop(network, address, + func(ctx context.Context, connection Connection) error { + buf, err := connection.Reader().Next(connection.Reader().Len()) + connection.Writer().WriteBinary(buf) + connection.Writer().Flush() + return err + }, + WithOnConnect(func(ctx context.Context, connection Connection) context.Context { + atomic.AddInt32(&connected, 1) + t.Logf("Conn[%s] accpeted", connection.RemoteAddr()) + go func() { + time.Sleep(time.Second * 100) + connection.Close() + }() + return ctx + }), + WithOnDisconnect(func(ctx context.Context, connection Connection) { + t.Logf("Conn[%s] disconnected", connection.RemoteAddr()) + }), + ) + + go func() { + err = createTestConnection("localhost", "18888", connections, 10*time.Second) + MustNil(t, err) + }() + for atomic.LoadInt32(&connected) < int32(connections) { + t.Logf("connected=%d", atomic.LoadInt32(&connected)) + time.Sleep(time.Millisecond * 500) + } + + err = loop.Shutdown(context.Background()) + MustNil(t, err) +} + func createTestListener(network, address string) (Listener, error) { for { ln, err := CreateListener(network, address) @@ -517,6 +578,31 @@ func createTestListener(network, address string) (Listener, error) { } } +func createTestConnection(ip string, port string, conns int, duration time.Duration) error { + sconns := strconv.Itoa(conns) + sduration := strconv.Itoa(int(duration.Seconds())) + cmd := exec.Command("./test_conns.sh", ip, port, sconns, sduration) + + stdout, err := cmd.StdoutPipe() + if err != nil { + return err + } + defer stdout.Close() + go func() { + reader := bufio.NewReader(stdout) + for { + line, err := reader.ReadString('\n') + if err != nil { + break + } + fmt.Print(line) + _ = line + } + }() + + return cmd.Run() +} + func newTestEventLoop(network, address string, onRequest OnRequest, opts ...Option) EventLoop { ln, err := createTestListener(network, address) if err != nil { diff --git a/poll_default_bsd.go b/poll_default_bsd.go index 9c8aa8c9..4b5e809a 100644 --- a/poll_default_bsd.go +++ b/poll_default_bsd.go @@ -100,6 +100,7 @@ func (p *defaultPoll) Wait() error { if operator.OnRead != nil { // for non-connection operator.OnRead(p) + println("operator OnRead", operator.FD, n) } else { // only for connection var bs = operator.Inputs(barriers[i].bs) diff --git a/test_conns.sh b/test_conns.sh new file mode 100755 index 00000000..91ae75a3 --- /dev/null +++ b/test_conns.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +ip="$1" +port="$2" +conns="$3" +timeout="$4" + +for i in $(seq 1 $conns); +do + nc -v -d -w $timeout $ip $port < /dev/null & +done + +wait