diff --git a/net_listener.go b/net_listener.go index 4bebcac6..e7f9edc1 100644 --- a/net_listener.go +++ b/net_listener.go @@ -91,7 +91,15 @@ func (ln *listener) Accept() (net.Conn, error) { // tcp var fd, sa, err = syscall.Accept(ln.fd) if err != nil { - if err == syscall.EAGAIN { + /* https://man7.org/linux/man-pages/man2/accept.2.html + EAGAIN or EWOULDBLOCK + The socket is marked nonblocking and no connections are + present to be accepted. POSIX.1-2001 and POSIX.1-2008 + allow either error to be returned for this case, and do + not require these constants to have the same value, so a + portable application should check for both possibilities. + */ + if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK { return nil, nil } return nil, err diff --git a/netpoll_server.go b/netpoll_server.go index 2d6c5709..78f26b59 100644 --- a/netpoll_server.go +++ b/netpoll_server.go @@ -22,6 +22,7 @@ import ( "errors" "strings" "sync" + "syscall" "time" ) @@ -92,39 +93,86 @@ func (s *server) Close(ctx context.Context) error { func (s *server) OnRead(p Poll) error { // accept socket conn, err := s.ln.Accept() - if err != nil { - // shut down - if strings.Contains(err.Error(), "closed") { - s.operator.Control(PollDetach) - s.onQuit(err) + if err == nil { + if conn != nil { + s.onAccept(conn.(Conn)) + } + // EAGAIN | EWOULDBLOCK if conn and err both nil + return nil + } + logger.Printf("NETPOLL: accept conn failed: %v", err) + + // delay accept when too many open files + if isOutOfFdErr(err) { + // since we use Epoll LT, we have to detach listener fd from epoll first + // and re-register it when accept successfully or there is no available connection + cerr := s.operator.Control(PollDetach) + if cerr != nil { + logger.Printf("NETPOLL: detach listener fd failed: %v", cerr) return err } - logger.Println("NETPOLL: accept conn failed:", err.Error()) - return err + go func() { + retryTimes := []time.Duration{0, 10, 50, 100, 200, 500, 1000} // ms + retryTimeIndex := 0 + for { + if retryTimeIndex > 0 { + time.Sleep(retryTimes[retryTimeIndex] * time.Millisecond) + } + conn, err := s.ln.Accept() + if err == nil { + if conn == nil { + // recovery accept poll loop + s.operator.Control(PollReadable) + return + } + s.onAccept(conn.(Conn)) + logger.Println("NETPOLL: re-accept conn success:", conn.RemoteAddr()) + retryTimeIndex = 0 + continue + } + if retryTimeIndex+1 < len(retryTimes) { + retryTimeIndex++ + } + logger.Printf("NETPOLL: re-accept conn failed, err=[%s] and next retrytime=%dms", err.Error(), retryTimes[retryTimeIndex]) + } + }() } - if conn == nil { - return nil + + // shut down + if strings.Contains(err.Error(), "closed") { + s.operator.Control(PollDetach) + s.onQuit(err) + return err } + + return err +} + +// OnHup implements FDOperator. +func (s *server) OnHup(p Poll) error { + s.onQuit(errors.New("listener close")) + return nil +} + +func (s *server) onAccept(conn Conn) { // store & register connection - var connection = &connection{} - connection.init(conn.(Conn), s.opts) - if !connection.IsActive() { - return nil + var nconn = new(connection) + nconn.init(conn, s.opts) + if !nconn.IsActive() { + return } - var fd = conn.(Conn).Fd() - connection.AddCloseCallback(func(connection Connection) error { + var fd = conn.Fd() + nconn.AddCloseCallback(func(connection Connection) error { s.connections.Delete(fd) return nil }) - s.connections.Store(fd, connection) + s.connections.Store(fd, nconn) // trigger onConnect asynchronously - connection.onConnect() - return nil + nconn.onConnect() } -// OnHup implements FDOperator. -func (s *server) OnHup(p Poll) error { - s.onQuit(errors.New("listener close")) - return nil +func isOutOfFdErr(err error) bool { + se, ok := err.(syscall.Errno) + return ok && (se == syscall.EMFILE || se == syscall.ENFILE) } diff --git a/netpoll_test.go b/netpoll_test.go index fb985604..b01f0a95 100644 --- a/netpoll_test.go +++ b/netpoll_test.go @@ -21,9 +21,11 @@ import ( "context" "errors" "math/rand" + "os" "runtime" "sync" "sync/atomic" + "syscall" "testing" "time" ) @@ -507,6 +509,78 @@ func TestClientWriteAndClose(t *testing.T) { MustNil(t, err) } +func TestServerAcceptWhenTooManyOpenFiles(t *testing.T) { + if os.Getenv("N_LOCAL") == "" { + t.Skip("Only test for debug purpose") + return + } + + var originalRlimit syscall.Rlimit + err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &originalRlimit) + MustNil(t, err) + t.Logf("Original RLimit: %v", originalRlimit) + + rlimit := syscall.Rlimit{Cur: 32, Max: originalRlimit.Max} + err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rlimit) + MustNil(t, err) + err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlimit) + MustNil(t, err) + t.Logf("New RLimit: %v", rlimit) + defer func() { // reset + err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &originalRlimit) + MustNil(t, err) + }() + + var network, address = "tcp", ":18888" + var connected int32 + var loop = newTestEventLoop(network, address, + func(ctx context.Context, connection Connection) error { + buf, err := connection.Reader().Next(connection.Reader().Len()) + connection.Writer().WriteBinary(buf) + connection.Writer().Flush() + return err + }, + WithOnConnect(func(ctx context.Context, connection Connection) context.Context { + atomic.AddInt32(&connected, 1) + t.Logf("Conn[%s] accpeted", connection.RemoteAddr()) + return ctx + }), + WithOnDisconnect(func(ctx context.Context, connection Connection) { + t.Logf("Conn[%s] disconnected", connection.RemoteAddr()) + }), + ) + time.Sleep(time.Millisecond * 10) + + // out of fds + files := make([]*os.File, 0) + for { + f, err := os.Open("/dev/null") + if err != nil { + Assert(t, isOutOfFdErr(errors.Unwrap(err)), err) + break + } + files = append(files, f) + } + go func() { + time.Sleep(time.Second * 10) + t.Logf("close all files") + for _, f := range files { + f.Close() + } + }() + + // we should use telnet manually + var connections = 1 + for atomic.LoadInt32(&connected) < int32(connections) { + t.Logf("connected=%d", atomic.LoadInt32(&connected)) + time.Sleep(time.Second) + } + time.Sleep(time.Second * 10) + + err = loop.Shutdown(context.Background()) + MustNil(t, err) +} + func createTestListener(network, address string) (Listener, error) { for { ln, err := CreateListener(network, address) diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index f399765d..3f57268c 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -561,9 +561,17 @@ func (b *UnsafeLinkBuffer) Bytes() []byte { return p[:n] } -// GetBytes will read and fill the slice p as much as possible. +// If p is not passed, return all readable bytes. func (b *UnsafeLinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { node, flush := b.read, b.flush + if len(p) == 0 { + n := 0 + for ; node != flush; node = node.next { + n++ + } + node = b.read + p = make([][]byte, n) + } var i int for i = 0; node != flush && i < len(p); node = node.next { if node.Len() > 0 { diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index 504ff2ab..b1f55b32 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -84,6 +84,31 @@ func TestLinkBuffer(t *testing.T) { Equal(t, buf.Len(), 100) } +func TestGetBytes(t *testing.T) { + buf := NewLinkBuffer() + var ( + num = 10 + b = 1 + expectedLen = 0 + ) + for i := 0; i < num; i++ { + expectedLen += b + n, err := buf.WriteBinary(make([]byte, b)) + MustNil(t, err) + Equal(t, n, b) + b *= 10 + } + buf.Flush() + Equal(t, int(buf.length), expectedLen) + bs := buf.GetBytes(nil) + actualLen := 0 + for i := 0; i < len(bs); i++ { + actualLen += len(bs[i]) + } + Equal(t, actualLen, expectedLen) + +} + // TestLinkBufferWithZero test more case with n is invalid. func TestLinkBufferWithInvalid(t *testing.T) { // clean & new diff --git a/test_conns.sh b/test_conns.sh new file mode 100755 index 00000000..33127f52 --- /dev/null +++ b/test_conns.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +ip="$1" +port="$2" +conns="$3" +timeout="$4" + +for i in $(seq 1 $conns); +do + nc -v -w $timeout $ip $port < /dev/null & +done + +wait