diff --git a/net_listener.go b/net_listener.go index 4bebcac6..e7f9edc1 100644 --- a/net_listener.go +++ b/net_listener.go @@ -91,7 +91,15 @@ func (ln *listener) Accept() (net.Conn, error) { // tcp var fd, sa, err = syscall.Accept(ln.fd) if err != nil { - if err == syscall.EAGAIN { + /* https://man7.org/linux/man-pages/man2/accept.2.html + EAGAIN or EWOULDBLOCK + The socket is marked nonblocking and no connections are + present to be accepted. POSIX.1-2001 and POSIX.1-2008 + allow either error to be returned for this case, and do + not require these constants to have the same value, so a + portable application should check for both possibilities. + */ + if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK { return nil, nil } return nil, err diff --git a/netpoll_server.go b/netpoll_server.go index 2d6c5709..cc0a290f 100644 --- a/netpoll_server.go +++ b/netpoll_server.go @@ -22,6 +22,7 @@ import ( "errors" "strings" "sync" + "syscall" "time" ) @@ -92,39 +93,87 @@ func (s *server) Close(ctx context.Context) error { func (s *server) OnRead(p Poll) error { // accept socket conn, err := s.ln.Accept() - if err != nil { - // shut down - if strings.Contains(err.Error(), "closed") { - s.operator.Control(PollDetach) - s.onQuit(err) - return err + if err == nil { + if conn != nil { + s.onAccept(conn.(Conn)) } - logger.Println("NETPOLL: accept conn failed:", err.Error()) - return err - } - if conn == nil { + // EAGAIN | EWOULDBLOCK if conn and err both nil return nil } + logger.Println("NETPOLL: accept conn failed:", err.Error(), isOutOfFdErr(err)) + + // delay accept when too many open files + if isOutOfFdErr(err) { + // since we use Epoll LT, we have to detach listener fd from epoll first + // and re-register it when accept successfully or there is no available connection + cerr := s.operator.Control(PollDetach) + if cerr != nil { + logger.Println("NETPOLL: detach listener fd failed:", cerr.Error()) + } + go func() { + retryTimes := []time.Duration{0, 10, 50, 100, 200, 500, 1000} // ms + retryTimeIndex := 0 + for { + if retryTimeIndex > 0 { + time.Sleep(retryTimes[retryTimeIndex] * time.Millisecond) + } + conn, err := s.ln.Accept() + println(conn, err) + if err == nil { + if conn == nil { + // recovery accept poll loop + s.operator.Control(PollReadable) + println("err and conn == nil") + return + } + s.onAccept(conn.(Conn)) + logger.Println("NETPOLL: re-accept conn success:", conn.RemoteAddr()) + retryTimeIndex = 0 + continue + } + if retryTimeIndex+1 < len(retryTimes) { + retryTimeIndex++ + } + logger.Printf("NETPOLL: re-accept conn failed, err=[%s] and next retrytime=%dms", err.Error(), retryTimes[retryTimeIndex]) + } + }() + } + + // shut down + if strings.Contains(err.Error(), "closed") { + s.operator.Control(PollDetach) + s.onQuit(err) + return err + } + + return err +} + +// OnHup implements FDOperator. +func (s *server) OnHup(p Poll) error { + s.onQuit(errors.New("listener close")) + return nil +} + +func (s *server) onAccept(conn Conn) { // store & register connection - var connection = &connection{} - connection.init(conn.(Conn), s.opts) - if !connection.IsActive() { - return nil + var nconn = new(connection) + nconn.init(conn, s.opts) + if !nconn.IsActive() { + return } - var fd = conn.(Conn).Fd() - connection.AddCloseCallback(func(connection Connection) error { + var fd = conn.Fd() + nconn.AddCloseCallback(func(connection Connection) error { s.connections.Delete(fd) return nil }) - s.connections.Store(fd, connection) + s.connections.Store(fd, nconn) // trigger onConnect asynchronously - connection.onConnect() - return nil + nconn.onConnect() } -// OnHup implements FDOperator. -func (s *server) OnHup(p Poll) error { - s.onQuit(errors.New("listener close")) - return nil +func isOutOfFdErr(err error) bool { + se, ok := err.(syscall.Errno) + return ok && (se == syscall.EMFILE || se == syscall.ENFILE) } diff --git a/netpoll_test.go b/netpoll_test.go index fb985604..912c4d47 100644 --- a/netpoll_test.go +++ b/netpoll_test.go @@ -21,9 +21,11 @@ import ( "context" "errors" "math/rand" + "os" "runtime" "sync" "sync/atomic" + "syscall" "testing" "time" ) @@ -507,6 +509,77 @@ func TestClientWriteAndClose(t *testing.T) { MustNil(t, err) } +func TestServerAcceptWhenTooManyOpenFiles(t *testing.T) { + if os.Getenv("N_LOCAL") == "" { + t.Skip("Only test for debug purpose") + return + } + + var originalRlimit syscall.Rlimit + err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &originalRlimit) + MustNil(t, err) + t.Logf("Original RLimit: %v", originalRlimit) + + rlimit := syscall.Rlimit{Cur: 32, Max: originalRlimit.Max} + err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rlimit) + MustNil(t, err) + err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlimit) + MustNil(t, err) + t.Logf("New RLimit: %v", rlimit) + defer func() { // reset + err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &originalRlimit) + MustNil(t, err) + }() + + var network, address = "tcp", ":18888" + var connected int32 + var loop = newTestEventLoop(network, address, + func(ctx context.Context, connection Connection) error { + buf, err := connection.Reader().Next(connection.Reader().Len()) + connection.Writer().WriteBinary(buf) + connection.Writer().Flush() + return err + }, + WithOnConnect(func(ctx context.Context, connection Connection) context.Context { + atomic.AddInt32(&connected, 1) + t.Logf("Conn[%s] accpeted", connection.RemoteAddr()) + return ctx + }), + WithOnDisconnect(func(ctx context.Context, connection Connection) { + t.Logf("Conn[%s] disconnected", connection.RemoteAddr()) + }), + ) + + // out of fds + files := make([]*os.File, 0) + for { + f, err := os.Open("/dev/null") + if err != nil { + Assert(t, isOutOfFdErr(errors.Unwrap(err)), err) + break + } + files = append(files, f) + } + go func() { + time.Sleep(time.Second * 10) + t.Logf("close all files") + for _, f := range files { + f.Close() + } + }() + + // we should use telnet manually + var connections = 1 + for atomic.LoadInt32(&connected) < int32(connections) { + t.Logf("connected=%d", atomic.LoadInt32(&connected)) + time.Sleep(time.Second) + } + time.Sleep(time.Second * 10) + + err = loop.Shutdown(context.Background()) + MustNil(t, err) +} + func createTestListener(network, address string) (Listener, error) { for { ln, err := CreateListener(network, address) diff --git a/test_conns.sh b/test_conns.sh new file mode 100755 index 00000000..33127f52 --- /dev/null +++ b/test_conns.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +ip="$1" +port="$2" +conns="$3" +timeout="$4" + +for i in $(seq 1 $conns); +do + nc -v -w $timeout $ip $port < /dev/null & +done + +wait